Skip to content

Commit 14d9c1f

Browse files
committed
Check qos, heartbeat, max channel are unsigned shorts
To avoid truncation and subtle bugs. Fixes #640 (cherry picked from commit 733788e) Conflicts: src/test/java/com/rabbitmq/client/test/ConnectionFactoryTest.java
1 parent 29dc1c8 commit 14d9c1f

File tree

8 files changed

+163
-69
lines changed

8 files changed

+163
-69
lines changed

src/main/java/com/rabbitmq/client/Channel.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -197,42 +197,49 @@ public interface Channel extends ShutdownNotifier, AutoCloseable {
197197

198198
/**
199199
* Request specific "quality of service" settings.
200-
*
200+
* <p>
201201
* These settings impose limits on the amount of data the server
202202
* will deliver to consumers before requiring acknowledgements.
203203
* Thus they provide a means of consumer-initiated flow control.
204-
* @see com.rabbitmq.client.AMQP.Basic.Qos
205-
* @param prefetchSize maximum amount of content (measured in
206-
* octets) that the server will deliver, 0 if unlimited
204+
* <p>
205+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
206+
*
207+
* @param prefetchSize maximum amount of content (measured in
208+
* octets) that the server will deliver, 0 if unlimited
207209
* @param prefetchCount maximum number of messages that the server
208-
* will deliver, 0 if unlimited
209-
* @param global true if the settings should be applied to the
210-
* entire channel rather than each consumer
210+
* will deliver, 0 if unlimited
211+
* @param global true if the settings should be applied to the
212+
* entire channel rather than each consumer
211213
* @throws java.io.IOException if an error is encountered
214+
* @see com.rabbitmq.client.AMQP.Basic.Qos
212215
*/
213216
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
214217

215218
/**
216219
* Request a specific prefetchCount "quality of service" settings
217220
* for this channel.
221+
* <p>
222+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
218223
*
219-
* @see #basicQos(int, int, boolean)
220224
* @param prefetchCount maximum number of messages that the server
221-
* will deliver, 0 if unlimited
222-
* @param global true if the settings should be applied to the
223-
* entire channel rather than each consumer
225+
* will deliver, 0 if unlimited
226+
* @param global true if the settings should be applied to the
227+
* entire channel rather than each consumer
224228
* @throws java.io.IOException if an error is encountered
229+
* @see #basicQos(int, int, boolean)
225230
*/
226231
void basicQos(int prefetchCount, boolean global) throws IOException;
227232

228233
/**
229234
* Request a specific prefetchCount "quality of service" settings
230235
* for this channel.
236+
* <p>
237+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
231238
*
232-
* @see #basicQos(int, int, boolean)
233239
* @param prefetchCount maximum number of messages that the server
234-
* will deliver, 0 if unlimited
240+
* will deliver, 0 if unlimited
235241
* @throws java.io.IOException if an error is encountered
242+
* @see #basicQos(int, int, boolean)
236243
*/
237244
void basicQos(int prefetchCount) throws IOException;
238245

src/main/java/com/rabbitmq/client/ConnectionFactory.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
*/
4848
public class ConnectionFactory implements Cloneable {
4949

50+
private static final int MAX_UNSIGNED_SHORT = 65535;
51+
5052
/** Default user name */
5153
public static final String DEFAULT_USER = "guest";
5254
/** Default password */
@@ -384,10 +386,16 @@ public int getRequestedChannelMax() {
384386
}
385387

386388
/**
387-
* Set the requested maximum channel number
389+
* Set the requested maximum channel number.
390+
* <p>
391+
* Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
392+
*
388393
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
389394
*/
390395
public void setRequestedChannelMax(int requestedChannelMax) {
396+
if (requestedChannelMax < 0 || requestedChannelMax > MAX_UNSIGNED_SHORT) {
397+
throw new IllegalArgumentException("Requested channel max must be between 0 and " + MAX_UNSIGNED_SHORT);
398+
}
391399
this.requestedChannelMax = requestedChannelMax;
392400
}
393401

@@ -477,10 +485,16 @@ public int getShutdownTimeout() {
477485
* Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval.
478486
* If server heartbeat timeout is configured to a non-zero value, this method can only be used
479487
* to lower the value; otherwise any value provided by the client will be used.
488+
* <p>
489+
* Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
490+
*
480491
* @param requestedHeartbeat the initially requested heartbeat timeout, in seconds; zero for none
481492
* @see <a href="https://rabbitmq.com/heartbeats.html">RabbitMQ Heartbeats Guide</a>
482493
*/
483494
public void setRequestedHeartbeat(int requestedHeartbeat) {
495+
if (requestedHeartbeat < 0 || requestedHeartbeat > MAX_UNSIGNED_SHORT) {
496+
throw new IllegalArgumentException("Requested heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT);
497+
}
484498
this.requestedHeartbeat = requestedHeartbeat;
485499
}
486500

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18-
import com.rabbitmq.client.*;
1918
import com.rabbitmq.client.Method;
19+
import com.rabbitmq.client.*;
2020
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
2121
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
2222
import com.rabbitmq.utility.BlockingCell;
2323
import com.rabbitmq.utility.Utility;
24-
2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
2726

@@ -47,6 +46,8 @@ final class Copyright {
4746
*/
4847
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
4948

49+
private static final int MAX_UNSIGNED_SHORT = 65535;
50+
5051
private static final Logger LOGGER = LoggerFactory.getLogger(AMQConnection.class);
5152
// we want socket write and channel shutdown timeouts to kick in after
5253
// the heartbeat one, so we use a value of 105% of the effective heartbeat timeout
@@ -402,6 +403,11 @@ public void start()
402403
int channelMax =
403404
negotiateChannelMax(this.requestedChannelMax,
404405
connTune.getChannelMax());
406+
407+
if (!checkUnsignedShort(channelMax)) {
408+
throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + channelMax);
409+
}
410+
405411
_channelManager = instantiateChannelManager(channelMax, threadFactory);
406412

407413
int frameMax =
@@ -413,6 +419,10 @@ public void start()
413419
negotiatedMaxValue(this.requestedHeartbeat,
414420
connTune.getHeartbeat());
415421

422+
if (!checkUnsignedShort(heartbeat)) {
423+
throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + heartbeat);
424+
}
425+
416426
setHeartbeat(heartbeat);
417427

418428
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
@@ -629,6 +639,10 @@ private static int negotiatedMaxValue(int clientValue, int serverValue) {
629639
Math.min(clientValue, serverValue);
630640
}
631641

642+
private static boolean checkUnsignedShort(int value) {
643+
return value >= 0 && value <= MAX_UNSIGNED_SHORT;
644+
}
645+
632646
private class MainLoop implements Runnable {
633647

634648
/**

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+15-17
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,24 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18-
import java.io.IOException;
19-
import java.util.Collection;
20-
import java.util.Collections;
21-
import java.util.HashMap;
22-
import java.util.Map;
23-
import java.util.SortedSet;
24-
import java.util.TreeSet;
25-
import java.util.concurrent.*;
26-
27-
import com.rabbitmq.client.ConfirmCallback;
2818
import com.rabbitmq.client.*;
29-
import com.rabbitmq.client.AMQP.BasicProperties;
19+
import com.rabbitmq.client.Connection;
3020
import com.rabbitmq.client.Method;
31-
import com.rabbitmq.client.impl.AMQImpl.Basic;
21+
import com.rabbitmq.client.AMQP.BasicProperties;
3222
import com.rabbitmq.client.impl.AMQImpl.Channel;
33-
import com.rabbitmq.client.impl.AMQImpl.Confirm;
34-
import com.rabbitmq.client.impl.AMQImpl.Exchange;
3523
import com.rabbitmq.client.impl.AMQImpl.Queue;
36-
import com.rabbitmq.client.impl.AMQImpl.Tx;
24+
import com.rabbitmq.client.impl.AMQImpl.*;
3725
import com.rabbitmq.utility.Utility;
38-
3926
import org.slf4j.Logger;
4027
import org.slf4j.LoggerFactory;
4128

29+
import java.io.IOException;
30+
import java.util.*;
31+
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.CopyOnWriteArrayList;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeoutException;
35+
4236
/**
4337
* Main interface to AMQP protocol functionality. Public API -
4438
* Implementation of all AMQChannels except channel zero.
@@ -50,6 +44,7 @@
5044
* </pre>
5145
*/
5246
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
47+
private static final int MAX_UNSIGNED_SHORT = 65535;
5348
private static final String UNSPECIFIED_OUT_OF_BAND = "";
5449
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelN.class);
5550

@@ -647,7 +642,10 @@ public AMQCommand transformReply(AMQCommand command) {
647642
public void basicQos(int prefetchSize, int prefetchCount, boolean global)
648643
throws IOException
649644
{
650-
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
645+
if (prefetchCount < 0 || prefetchCount > MAX_UNSIGNED_SHORT) {
646+
throw new IllegalArgumentException("Prefetch count must be between 0 and " + MAX_UNSIGNED_SHORT);
647+
}
648+
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
651649
}
652650

653651
/** Public API - {@inheritDoc} */

src/test/java/com/rabbitmq/client/test/ChannelNTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import java.io.IOException;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.stream.Stream;
29+
30+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2831

2932
public class ChannelNTest {
3033

@@ -58,4 +61,32 @@ public void callingBasicCancelForUnknownConsumerThrowsException() throws Excepti
5861
channel.basicCancel("does-not-exist");
5962
}
6063

64+
@Test
65+
public void qosShouldBeUnsignedShort() {
66+
AMQConnection connection = Mockito.mock(AMQConnection.class);
67+
ChannelN channel = new ChannelN(connection, 1, consumerWorkService);
68+
class TestConfig {
69+
int value;
70+
Consumer call;
71+
72+
public TestConfig(int value, Consumer call) {
73+
this.value = value;
74+
this.call = call;
75+
}
76+
}
77+
Consumer qos = value -> channel.basicQos(value);
78+
Consumer qosGlobal = value -> channel.basicQos(value, true);
79+
Consumer qosPrefetchSize = value -> channel.basicQos(10, value, true);
80+
Stream.of(
81+
new TestConfig(-1, qos), new TestConfig(65536, qos)
82+
).flatMap(config -> Stream.of(config, new TestConfig(config.value, qosGlobal), new TestConfig(config.value, qosPrefetchSize)))
83+
.forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class));
84+
}
85+
86+
interface Consumer {
87+
88+
void apply(int value) throws Exception;
89+
90+
}
91+
6192
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
ConnectionFactoryTest.class,
5858
RecoveryAwareAMQConnectionFactoryTest.class,
5959
RpcTest.class,
60-
SslContextFactoryTest.class,
6160
LambdaCallbackTest.class,
6261
ChannelAsyncCompletableFutureTest.class,
6362
RecoveryDelayHandlerTest.class,

0 commit comments

Comments
 (0)