Skip to content

Commit ac8b6f0

Browse files
Merge pull request #641 from rabbitmq/rabbitmq-java-client-640-check-unsigned-short
Check qos, heartbeat, max channel are unsigned shorts
2 parents 3bc7ee6 + 733788e commit ac8b6f0

File tree

8 files changed

+163
-71
lines changed

8 files changed

+163
-71
lines changed

src/main/java/com/rabbitmq/client/Channel.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -193,42 +193,49 @@ public interface Channel extends ShutdownNotifier, AutoCloseable {
193193

194194
/**
195195
* Request specific "quality of service" settings.
196-
*
196+
* <p>
197197
* These settings impose limits on the amount of data the server
198198
* will deliver to consumers before requiring acknowledgements.
199199
* Thus they provide a means of consumer-initiated flow control.
200-
* @see com.rabbitmq.client.AMQP.Basic.Qos
201-
* @param prefetchSize maximum amount of content (measured in
202-
* octets) that the server will deliver, 0 if unlimited
200+
* <p>
201+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
202+
*
203+
* @param prefetchSize maximum amount of content (measured in
204+
* octets) that the server will deliver, 0 if unlimited
203205
* @param prefetchCount maximum number of messages that the server
204-
* will deliver, 0 if unlimited
205-
* @param global true if the settings should be applied to the
206-
* entire channel rather than each consumer
206+
* will deliver, 0 if unlimited
207+
* @param global true if the settings should be applied to the
208+
* entire channel rather than each consumer
207209
* @throws java.io.IOException if an error is encountered
210+
* @see com.rabbitmq.client.AMQP.Basic.Qos
208211
*/
209212
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
210213

211214
/**
212215
* Request a specific prefetchCount "quality of service" settings
213216
* for this channel.
217+
* <p>
218+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
214219
*
215-
* @see #basicQos(int, int, boolean)
216220
* @param prefetchCount maximum number of messages that the server
217-
* will deliver, 0 if unlimited
218-
* @param global true if the settings should be applied to the
219-
* entire channel rather than each consumer
221+
* will deliver, 0 if unlimited
222+
* @param global true if the settings should be applied to the
223+
* entire channel rather than each consumer
220224
* @throws java.io.IOException if an error is encountered
225+
* @see #basicQos(int, int, boolean)
221226
*/
222227
void basicQos(int prefetchCount, boolean global) throws IOException;
223228

224229
/**
225230
* Request a specific prefetchCount "quality of service" settings
226231
* for this channel.
232+
* <p>
233+
* Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
227234
*
228-
* @see #basicQos(int, int, boolean)
229235
* @param prefetchCount maximum number of messages that the server
230-
* will deliver, 0 if unlimited
236+
* will deliver, 0 if unlimited
231237
* @throws java.io.IOException if an error is encountered
238+
* @see #basicQos(int, int, boolean)
232239
*/
233240
void basicQos(int prefetchCount) throws IOException;
234241

src/main/java/com/rabbitmq/client/ConnectionFactory.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
*/
4848
public class ConnectionFactory implements Cloneable {
4949

50+
private static final int MAX_UNSIGNED_SHORT = 65535;
51+
5052
/** Default user name */
5153
public static final String DEFAULT_USER = "guest";
5254
/** Default password */
@@ -384,10 +386,16 @@ public int getRequestedChannelMax() {
384386
}
385387

386388
/**
387-
* Set the requested maximum channel number
389+
* Set the requested maximum channel number.
390+
* <p>
391+
* Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
392+
*
388393
* @param requestedChannelMax initially requested maximum channel number; zero for unlimited
389394
*/
390395
public void setRequestedChannelMax(int requestedChannelMax) {
396+
if (requestedChannelMax < 0 || requestedChannelMax > MAX_UNSIGNED_SHORT) {
397+
throw new IllegalArgumentException("Requested channel max must be between 0 and " + MAX_UNSIGNED_SHORT);
398+
}
391399
this.requestedChannelMax = requestedChannelMax;
392400
}
393401

@@ -477,10 +485,16 @@ public int getShutdownTimeout() {
477485
* Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval.
478486
* If server heartbeat timeout is configured to a non-zero value, this method can only be used
479487
* to lower the value; otherwise any value provided by the client will be used.
488+
* <p>
489+
* Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
490+
*
480491
* @param requestedHeartbeat the initially requested heartbeat timeout, in seconds; zero for none
481492
* @see <a href="https://rabbitmq.com/heartbeats.html">RabbitMQ Heartbeats Guide</a>
482493
*/
483494
public void setRequestedHeartbeat(int requestedHeartbeat) {
495+
if (requestedHeartbeat < 0 || requestedHeartbeat > MAX_UNSIGNED_SHORT) {
496+
throw new IllegalArgumentException("Requested heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT);
497+
}
484498
this.requestedHeartbeat = requestedHeartbeat;
485499
}
486500

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18-
import com.rabbitmq.client.*;
1918
import com.rabbitmq.client.Method;
19+
import com.rabbitmq.client.*;
2020
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
2121
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
2222
import com.rabbitmq.utility.BlockingCell;
2323
import com.rabbitmq.utility.Utility;
24-
2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
2726

@@ -47,6 +46,8 @@ final class Copyright {
4746
*/
4847
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
4948

49+
private static final int MAX_UNSIGNED_SHORT = 65535;
50+
5051
private static final Logger LOGGER = LoggerFactory.getLogger(AMQConnection.class);
5152
// we want socket write and channel shutdown timeouts to kick in after
5253
// the heartbeat one, so we use a value of 105% of the effective heartbeat timeout
@@ -399,6 +400,11 @@ public void start()
399400
int channelMax =
400401
negotiateChannelMax(this.requestedChannelMax,
401402
connTune.getChannelMax());
403+
404+
if (!checkUnsignedShort(channelMax)) {
405+
throw new IllegalArgumentException("Negotiated channel max must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + channelMax);
406+
}
407+
402408
_channelManager = instantiateChannelManager(channelMax, threadFactory);
403409

404410
int frameMax =
@@ -410,6 +416,10 @@ public void start()
410416
negotiatedMaxValue(this.requestedHeartbeat,
411417
connTune.getHeartbeat());
412418

419+
if (!checkUnsignedShort(heartbeat)) {
420+
throw new IllegalArgumentException("Negotiated heartbeat must be between 0 and " + MAX_UNSIGNED_SHORT + ": " + heartbeat);
421+
}
422+
413423
setHeartbeat(heartbeat);
414424

415425
_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
@@ -626,6 +636,10 @@ private static int negotiatedMaxValue(int clientValue, int serverValue) {
626636
Math.min(clientValue, serverValue);
627637
}
628638

639+
private static boolean checkUnsignedShort(int value) {
640+
return value >= 0 && value <= MAX_UNSIGNED_SHORT;
641+
}
642+
629643
private class MainLoop implements Runnable {
630644

631645
/**

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+15-17
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,24 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18-
import java.io.IOException;
19-
import java.util.Collection;
20-
import java.util.Collections;
21-
import java.util.HashMap;
22-
import java.util.Map;
23-
import java.util.SortedSet;
24-
import java.util.TreeSet;
25-
import java.util.concurrent.*;
26-
27-
import com.rabbitmq.client.ConfirmCallback;
2818
import com.rabbitmq.client.*;
29-
import com.rabbitmq.client.AMQP.BasicProperties;
19+
import com.rabbitmq.client.Connection;
3020
import com.rabbitmq.client.Method;
31-
import com.rabbitmq.client.impl.AMQImpl.Basic;
21+
import com.rabbitmq.client.AMQP.BasicProperties;
3222
import com.rabbitmq.client.impl.AMQImpl.Channel;
33-
import com.rabbitmq.client.impl.AMQImpl.Confirm;
34-
import com.rabbitmq.client.impl.AMQImpl.Exchange;
3523
import com.rabbitmq.client.impl.AMQImpl.Queue;
36-
import com.rabbitmq.client.impl.AMQImpl.Tx;
24+
import com.rabbitmq.client.impl.AMQImpl.*;
3725
import com.rabbitmq.utility.Utility;
38-
3926
import org.slf4j.Logger;
4027
import org.slf4j.LoggerFactory;
4128

29+
import java.io.IOException;
30+
import java.util.*;
31+
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.CopyOnWriteArrayList;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeoutException;
35+
4236
/**
4337
* Main interface to AMQP protocol functionality. Public API -
4438
* Implementation of all AMQChannels except channel zero.
@@ -50,6 +44,7 @@
5044
* </pre>
5145
*/
5246
public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
47+
private static final int MAX_UNSIGNED_SHORT = 65535;
5348
private static final String UNSPECIFIED_OUT_OF_BAND = "";
5449
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelN.class);
5550

@@ -647,7 +642,10 @@ public AMQCommand transformReply(AMQCommand command) {
647642
public void basicQos(int prefetchSize, int prefetchCount, boolean global)
648643
throws IOException
649644
{
650-
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
645+
if (prefetchCount < 0 || prefetchCount > MAX_UNSIGNED_SHORT) {
646+
throw new IllegalArgumentException("Prefetch count must be between 0 and " + MAX_UNSIGNED_SHORT);
647+
}
648+
exnWrappingRpc(new Basic.Qos(prefetchSize, prefetchCount, global));
651649
}
652650

653651
/** Public API - {@inheritDoc} */

src/test/java/com/rabbitmq/client/test/ChannelNTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
import java.util.concurrent.ExecutorService;
2626
import java.util.concurrent.Executors;
27+
import java.util.stream.Stream;
28+
29+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2730

2831
public class ChannelNTest {
2932

@@ -57,4 +60,32 @@ public void callingBasicCancelForUnknownConsumerDoesNotThrowException() throws E
5760
channel.basicCancel("does-not-exist");
5861
}
5962

63+
@Test
64+
public void qosShouldBeUnsignedShort() {
65+
AMQConnection connection = Mockito.mock(AMQConnection.class);
66+
ChannelN channel = new ChannelN(connection, 1, consumerWorkService);
67+
class TestConfig {
68+
int value;
69+
Consumer call;
70+
71+
public TestConfig(int value, Consumer call) {
72+
this.value = value;
73+
this.call = call;
74+
}
75+
}
76+
Consumer qos = value -> channel.basicQos(value);
77+
Consumer qosGlobal = value -> channel.basicQos(value, true);
78+
Consumer qosPrefetchSize = value -> channel.basicQos(10, value, true);
79+
Stream.of(
80+
new TestConfig(-1, qos), new TestConfig(65536, qos)
81+
).flatMap(config -> Stream.of(config, new TestConfig(config.value, qosGlobal), new TestConfig(config.value, qosPrefetchSize)))
82+
.forEach(config -> assertThatThrownBy(() -> config.call.apply(config.value)).isInstanceOf(IllegalArgumentException.class));
83+
}
84+
85+
interface Consumer {
86+
87+
void apply(int value) throws Exception;
88+
89+
}
90+
6091
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
ConnectionFactoryTest.class,
5353
RecoveryAwareAMQConnectionFactoryTest.class,
5454
RpcTest.class,
55-
SslContextFactoryTest.class,
5655
LambdaCallbackTest.class,
5756
ChannelAsyncCompletableFutureTest.class,
5857
RecoveryDelayHandlerTest.class,

0 commit comments

Comments
 (0)