Skip to content

Commit f2ea862

Browse files
committed
Make sure qos, heartbeat, max channel are unsigned shorts
Sets the value to 0 or 65535 and issues a warning if it is out of range. Fixes #642
1 parent efb0590 commit f2ea862

File tree

5 files changed

+84
-37
lines changed

5 files changed

+84
-37
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
2222
import com.rabbitmq.client.impl.recovery.RetryHandler;
2323
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426

2527
import javax.net.SocketFactory;
2628
import javax.net.ssl.SSLContext;
@@ -47,6 +49,8 @@
4749
*/
4850
public class ConnectionFactory implements Cloneable {
4951

52+
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionFactory.class);
53+
5054
private static final int MAX_UNSIGNED_SHORT = 65535;
5155

5256
/** Default user name */
@@ -393,10 +397,11 @@ public int getRequestedChannelMax() {
393397
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
394398
*/
395399
public void setRequestedChannelMax(int requestedChannelMax) {
396-
if (requestedChannelMax < 0 || requestedChannelMax > MAX_UNSIGNED_SHORT) {
397-
throw new IllegalArgumentException("Requested channel max must be between 0 and " + MAX_UNSIGNED_SHORT);
400+
this.requestedChannelMax = ensureUnsignedShort(requestedChannelMax);
401+
if (this.requestedChannelMax != requestedChannelMax) {
402+
LOGGER.warn("Requested channel max must be between 0 and {}, value has been set to {} instead of {}",
403+
MAX_UNSIGNED_SHORT, this.requestedChannelMax, requestedChannelMax);
398404
}
399-
this.requestedChannelMax = requestedChannelMax;
400405
}
401406

402407
/**
@@ -492,10 +497,11 @@ public int getShutdownTimeout() {
492497
* @see <a href="https://rabbitmq.com/heartbeats.html">RabbitMQ Heartbeats Guide</a>
493498
*/
494499
public void setRequestedHeartbeat(int requestedHeartbeat) {
495-
if (requestedHeartbeat < 0 || requestedHeartbeat > MAX_UNSIGNED_SHORT) {
496-
throw new IllegalArgumentException("Requested heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT);
500+
this.requestedHeartbeat = ensureUnsignedShort(requestedHeartbeat);
501+
if (this.requestedHeartbeat != requestedHeartbeat) {
502+
LOGGER.warn("Requested heartbeat must be between 0 and {}, value has been set to {} instead of {}",
503+
MAX_UNSIGNED_SHORT, this.requestedHeartbeat, requestedHeartbeat);
497504
}
498-
this.requestedHeartbeat = requestedHeartbeat;
499505
}
500506

501507
/**
@@ -1574,4 +1580,14 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
15741580
public void setTrafficListener(TrafficListener trafficListener) {
15751581
this.trafficListener = trafficListener;
15761582
}
1583+
1584+
public static int ensureUnsignedShort(int value) {
1585+
if (value < 0) {
1586+
return 0;
1587+
} else if (value > MAX_UNSIGNED_SHORT) {
1588+
return MAX_UNSIGNED_SHORT;
1589+
} else {
1590+
return value;
1591+
}
1592+
}
15771593
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -400,12 +400,15 @@ public void start()
400400
}
401401

402402
try {
403-
int channelMax =
403+
int negotiatedChannelMax =
404404
negotiateChannelMax(this.requestedChannelMax,
405405
connTune.getChannelMax());
406406

407-
if (!checkUnsignedShort(channelMax)) {
408-
throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + channelMax);
407+
int channelMax = ConnectionFactory.ensureUnsignedShort(negotiatedChannelMax);
408+
409+
if (channelMax != negotiatedChannelMax) {
410+
LOGGER.warn("Channel max must be between 0 and {}, value has been set to {} instead of {}",
411+
MAX_UNSIGNED_SHORT, channelMax, negotiatedChannelMax);
409412
}
410413

411414
_channelManager = instantiateChannelManager(channelMax, threadFactory);
@@ -415,12 +418,15 @@ public void start()
415418
connTune.getFrameMax());
416419
this._frameMax = frameMax;
417420

418-
int heartbeat =
421+
int negotiatedHeartbeat =
419422
negotiatedMaxValue(this.requestedHeartbeat,
420423
connTune.getHeartbeat());
421424

422-
if (!checkUnsignedShort(heartbeat)) {
423-
throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + heartbeat);
425+
int heartbeat = ConnectionFactory.ensureUnsignedShort(negotiatedHeartbeat);
426+
427+
if (heartbeat != negotiatedHeartbeat) {
428+
LOGGER.warn("Heartbeat must be between 0 and {}, value has been set to {} instead of {}",
429+
MAX_UNSIGNED_SHORT, heartbeat, negotiatedHeartbeat);
424430
}
425431

426432
setHeartbeat(heartbeat);

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -642,10 +642,12 @@ public AMQCommand transformReply(AMQCommand command) {
642642
public void basicQos(int prefetchSize, int prefetchCount, boolean global)
643643
throws IOException
644644
{
645-
if (prefetchCount < 0 || prefetchCount > MAX_UNSIGNED_SHORT) {
646-
throw new IllegalArgumentException("Prefetch count must be between 0 and " + MAX_UNSIGNED_SHORT);
645+
int unsignedShortPrefetchCount = ConnectionFactory.ensureUnsignedShort(prefetchCount);
646+
if (unsignedShortPrefetchCount != prefetchCount) {
647+
LOGGER.warn("Prefetch count must be between 0 and {}, value has been set to {} instead of {}",
648+
MAX_UNSIGNED_SHORT, unsignedShortPrefetchCount, prefetchCount);
647649
}
648-
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
650+
exnWrappingRpc(new Basic.Qos(prefetchSize, unsignedShortPrefetchCount, global));
649651
}
650652

651653
/** Public API - {@inheritDoc} */

src/test/java/com/rabbitmq/client/test/ChannelNTest.java

+27-5
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import java.io.IOException;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.concurrent.atomic.AtomicReference;
2829
import java.util.stream.Stream;
2930

31+
import static org.assertj.core.api.Assertions.assertThat;
3032
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3133

3234
public class ChannelNTest {
@@ -64,23 +66,43 @@ public void callingBasicCancelForUnknownConsumerThrowsException() throws Excepti
6466
@Test
6567
public void qosShouldBeUnsignedShort() {
6668
AMQConnection connection = Mockito.mock(AMQConnection.class);
67-
ChannelN channel = new ChannelN(connection, 1, consumerWorkService);
69+
AtomicReference<com.rabbitmq.client.AMQP.Basic.Qos> qosMethod = new AtomicReference<>();
70+
ChannelN channel = new ChannelN(connection, 1, consumerWorkService) {
71+
@Override
72+
public AMQCommand exnWrappingRpc(Method m) {
73+
qosMethod.set((com.rabbitmq.client.AMQP.Basic.Qos) m);
74+
return null;
75+
}
76+
};
6877
class TestConfig {
6978
int value;
7079
Consumer call;
80+
int expected;
7181

72-
public TestConfig(int value, Consumer call) {
82+
public TestConfig(int value, Consumer call, int expected) {
7383
this.value = value;
7484
this.call = call;
85+
this.expected = expected;
7586
}
7687
}
7788
Consumer qos = value -> channel.basicQos(value);
7889
Consumer qosGlobal = value -> channel.basicQos(value, true);
7990
Consumer qosPrefetchSize = value -> channel.basicQos(10, value, true);
8091
Stream.of(
81-
new TestConfig(-1, qos), new TestConfig(65536, qos)
82-
).flatMap(config -> Stream.of(config, new TestConfig(config.value, qosGlobal), new TestConfig(config.value, qosPrefetchSize)))
83-
.forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class));
92+
new TestConfig(-1, qos, 0), new TestConfig(65536, qos, 65535),
93+
new TestConfig(10, qos, 10), new TestConfig(0, qos, 0)
94+
).flatMap(config -> Stream.of(config, new TestConfig(config.value, qosGlobal, config.expected), new TestConfig(config.value, qosPrefetchSize, config.expected)))
95+
.forEach(config -> {
96+
try {
97+
assertThat(qosMethod.get()).isNull();
98+
config.call.apply(config.value);
99+
assertThat(qosMethod.get()).isNotNull();
100+
assertThat(qosMethod.get().getPrefetchCount()).isEqualTo(config.expected);
101+
qosMethod.set(null);
102+
} catch (Exception e) {
103+
e.printStackTrace();
104+
}
105+
});
84106
}
85107

86108
interface Consumer {

src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java

+18-17
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
import java.util.concurrent.atomic.AtomicBoolean;
2828
import java.util.concurrent.atomic.AtomicReference;
2929
import java.util.function.Consumer;
30+
import java.util.function.Supplier;
3031
import java.util.stream.Stream;
3132

3233
import static org.assertj.core.api.Assertions.assertThat;
33-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3434
import static org.mockito.Mockito.*;
3535

3636
public class ConnectionFactoryTest {
@@ -164,33 +164,34 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() {
164164
public void heartbeatAndChannelMaxMustBeUnsignedShorts() {
165165
class TestConfig {
166166
int value;
167-
Consumer<Integer> call;
168-
boolean expectException;
167+
Supplier<Integer> getCall;
168+
Consumer<Integer> setCall;
169+
int expected;
169170

170-
public TestConfig(int value, Consumer<Integer> call, boolean expectException) {
171+
public TestConfig(int value, Supplier<Integer> getCall, Consumer<Integer> setCall, int expected) {
171172
this.value = value;
172-
this.call = call;
173-
this.expectException = expectException;
173+
this.getCall = getCall;
174+
this.setCall = setCall;
175+
this.expected = expected;
174176
}
175177
}
176178

177179
ConnectionFactory cf = new ConnectionFactory();
180+
Supplier<Integer> getHeartbeart = () -> cf.getRequestedHeartbeat();
178181
Consumer<Integer> setHeartbeat = cf::setRequestedHeartbeat;
182+
Supplier<Integer> getChannelMax = () -> cf.getRequestedChannelMax();
179183
Consumer<Integer> setChannelMax = cf::setRequestedChannelMax;
180184

181185
Stream.of(
182-
new TestConfig(0, setHeartbeat, false),
183-
new TestConfig(10, setHeartbeat, false),
184-
new TestConfig(65535, setHeartbeat, false),
185-
new TestConfig(-1, setHeartbeat, true),
186-
new TestConfig(65536, setHeartbeat, true))
187-
.flatMap(config -> Stream.of(config, new TestConfig(config.value, setChannelMax, config.expectException)))
186+
new TestConfig(0, getHeartbeart, setHeartbeat, 0),
187+
new TestConfig(10, getHeartbeart, setHeartbeat, 10),
188+
new TestConfig(65535, getHeartbeart, setHeartbeat, 65535),
189+
new TestConfig(-1, getHeartbeart, setHeartbeat, 0),
190+
new TestConfig(65536, getHeartbeart, setHeartbeat, 65535))
191+
.flatMap(config -> Stream.of(config, new TestConfig(config.value, getChannelMax, setChannelMax, config.expected)))
188192
.forEach(config -> {
189-
if (config.expectException) {
190-
assertThatThrownBy(() -> config.call.accept(config.value)).isInstanceOf(IllegalArgumentException.class);
191-
} else {
192-
config.call.accept(config.value);
193-
}
193+
config.setCall.accept(config.value);
194+
assertThat(config.getCall.get()).isEqualTo(config.expected);
194195
});
195196

196197
}

0 commit comments

Comments
 (0)