Skip to content

Commit b3edb1e

Browse files
committed
Handle URI query parameters in chain of responsibility
With fallback hook, by default empty. References #672
1 parent bb6c3cb commit b3edb1e

File tree

2 files changed

+68
-33
lines changed

2 files changed

+68
-33
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

+51-32
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.rabbitmq.client.impl.recovery.RetryHandler;
2323
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2424

25+
import java.util.Map.Entry;
26+
import java.util.function.BiConsumer;
2527
import javax.net.SocketFactory;
2628
import javax.net.ssl.SSLContext;
2729
import javax.net.ssl.SSLSocketFactory;
@@ -382,6 +384,36 @@ private static String uriDecode(String s) {
382384
}
383385
}
384386

387+
private static final Map<String, BiConsumer<String, ConnectionFactory>> URI_QUERY_PARAMETER_HANDLERS =
388+
new HashMap<String, BiConsumer<String, ConnectionFactory>>() {
389+
{
390+
put("heartbeat", (value, cf) -> {
391+
try {
392+
int heartbeatInt = Integer.parseInt(value);
393+
cf.setRequestedHeartbeat(heartbeatInt);
394+
} catch (NumberFormatException e) {
395+
throw new IllegalArgumentException("Requested heartbeat must an integer");
396+
}
397+
});
398+
put("connection_timeout", (value, cf) -> {
399+
try {
400+
int connectionTimeoutInt = Integer.parseInt(value);
401+
cf.setConnectionTimeout(connectionTimeoutInt);
402+
} catch (NumberFormatException e) {
403+
throw new IllegalArgumentException("TCP connection timeout must an integer");
404+
}
405+
});
406+
put("channel_max", (value, cf) -> {
407+
try {
408+
int channelMaxInt = Integer.parseInt(value);
409+
cf.setRequestedChannelMax(channelMaxInt);
410+
} catch (NumberFormatException e) {
411+
throw new IllegalArgumentException("Requested channel max must an integer");
412+
}
413+
});
414+
}
415+
};
416+
385417
/**
386418
* Convenience method for setting some fields from query parameters
387419
* Will handle only a subset of the query parameters supported by the
@@ -391,7 +423,6 @@ private static String uriDecode(String s) {
391423
*/
392424
private void setQuery(String rawQuery) {
393425
Map<String, String> parameters = new HashMap<>();
394-
395426
// parsing the query parameters
396427
try {
397428
for (String param : rawQuery.split("&")) {
@@ -404,43 +435,31 @@ private void setQuery(String rawQuery) {
404435
parameters.put(key, value);
405436
}
406437
} catch (IOException e) {
407-
throw new RuntimeException("Cannot parse the query parameters", e);
438+
throw new IllegalArgumentException("Cannot parse the query parameters", e);
408439
}
409440

410-
// heartbeat
411-
String heartbeat = parameters.get("heartbeat");
412-
if (heartbeat != null) {
413-
try {
414-
int heartbeatInt = Integer.parseInt(heartbeat);
415-
setRequestedHeartbeat(heartbeatInt);
416-
} catch (NumberFormatException e) {
417-
throw new IllegalArgumentException("Requested heartbeat must an integer");
418-
}
419-
}
420-
421-
// connection_timeout
422-
String connectionTimeout = parameters.get("connection_timeout");
423-
if (connectionTimeout != null) {
424-
try {
425-
int connectionTimeoutInt = Integer.parseInt(connectionTimeout);
426-
setConnectionTimeout(connectionTimeoutInt);
427-
} catch (NumberFormatException e) {
428-
throw new IllegalArgumentException("TCP connection timeout must an integer");
429-
}
430-
}
431-
432-
// channel_max
433-
String channelMax = parameters.get("channel_max");
434-
if (channelMax != null) {
435-
try {
436-
int channelMaxInt = Integer.parseInt(channelMax);
437-
setRequestedChannelMax(channelMaxInt);
438-
} catch (NumberFormatException e) {
439-
throw new IllegalArgumentException("Requested channel max must an integer");
441+
for (Entry<String, String> entry : parameters.entrySet()) {
442+
BiConsumer<String, ConnectionFactory> handler = URI_QUERY_PARAMETER_HANDLERS
443+
.get(entry.getKey());
444+
if (handler != null) {
445+
handler.accept(entry.getValue(), this);
446+
} else {
447+
processUriQueryParameter(entry.getKey(), entry.getValue());
440448
}
441449
}
442450
}
443451

452+
/**
453+
* Hook to process query parameters not handled natively.
454+
* Handled natively: <code>heartbeat</code>, <code>connection_timeout</code>,
455+
* <code>channel_max</code>.
456+
* @param key
457+
* @param value
458+
*/
459+
protected void processUriQueryParameter(String key, String value) {
460+
461+
}
462+
444463
/**
445464
* Retrieve the requested maximum channel number
446465
* @return the initially requested maximum channel number; zero for unlimited

src/test/java/com/rabbitmq/client/test/AmqpUriTest.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,21 @@
1414
1515
package com.rabbitmq.client.test;
1616

17+
import static org.assertj.core.api.Assertions.assertThat;
1718
import static org.junit.Assert.assertEquals;
1819
import static org.junit.Assert.fail;
1920

2021
import java.net.URISyntaxException;
2122
import java.security.KeyManagementException;
2223
import java.security.NoSuchAlgorithmException;
2324

25+
import java.util.HashMap;
26+
import java.util.Map;
2427
import org.junit.Test;
2528

2629
import com.rabbitmq.client.ConnectionFactory;
2730

28-
public class AmqpUriTest extends BrokerTestCase
31+
public class AmqpUriTest
2932
{
3033
@Test public void uriParsing()
3134
throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException
@@ -109,6 +112,19 @@ public class AmqpUriTest extends BrokerTestCase
109112
parseFail("amqp://user:pass@host:10000/vhost?heartbeat=342?connection_timeout=442");
110113
}
111114

115+
@Test
116+
public void processUriQueryParameterShouldBeCalledForNotHandledParameter() throws Exception {
117+
Map<String, String> processedParameters = new HashMap<>();
118+
ConnectionFactory cf = new ConnectionFactory() {
119+
@Override
120+
protected void processUriQueryParameter(String key, String value) {
121+
processedParameters.put(key, value);
122+
}
123+
};
124+
cf.setUri("amqp://user:pass@host:10000/vhost?heartbeat=60&key=value");
125+
assertThat(processedParameters).hasSize(1).containsEntry("key", "value");
126+
}
127+
112128
private void parseSuccess(String uri, String user, String password,
113129
String host, int port, String vhost, boolean secured)
114130
throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException

0 commit comments

Comments
 (0)