Skip to content

Commit be7540d

Browse files
author
Brandon Shroyer
committed
Overload ConnectionFactory.newConnection methods to use lists as well as arrays.
* Public newConnection() methods that took arrays as inputs are still present, but wrap around list-based invocations. Addresses [issue #125](#125).
1 parent 16d34ab commit be7540d

File tree

6 files changed

+125
-63
lines changed

6 files changed

+125
-63
lines changed

build.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@
8787
<filter token="VERSION" value="${impl.version}"/>
8888
</filterset>
8989
</copy>
90-
<javac destdir="${javac.out}"
90+
<javac
91+
destdir="${javac.out}"
9192
classpathref="javac.classpath"
9293
source="${standard.javac.source}"
9394
target="${standard.javac.target}"

src/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.security.NoSuchAlgorithmException;
2222
import java.util.Map;
2323
import java.util.concurrent.*;
24+
import java.util.List;
25+
import java.util.Arrays;
2426

2527
import java.net.URI;
2628
import java.net.URISyntaxException;
@@ -641,7 +643,23 @@ protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
641643
* @throws IOException if it encounters a problem
642644
*/
643645
public Connection newConnection(Address[] addrs) throws IOException, TimeoutException {
644-
return newConnection(this.sharedExecutor, addrs);
646+
return newConnection(this.sharedExecutor, Arrays.asList(addrs));
647+
}
648+
649+
/**
650+
* Create a new broker connection, picking the first available address from
651+
* the list.
652+
*
653+
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
654+
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
655+
* reconnection attempts will pick a random accessible address from the provided list.
656+
*
657+
* @param addr_list a List of known broker addresses (hostname/port pairs) to try in order
658+
* @return an interface to the connection
659+
* @throws IOException if it encounters a problem
660+
*/
661+
public Connection newConnection(List<Address> addr_list) throws IOException, TimeoutException {
662+
return newConnection(this.sharedExecutor, addr_list);
645663
}
646664

647665
/**
@@ -658,20 +676,39 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
658676
* @throws java.io.IOException if it encounters a problem
659677
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
660678
*/
661-
public Connection newConnection(ExecutorService executor, Address[] addrs)
679+
public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException {
680+
return newConnection(executor, Arrays.asList(addrs));
681+
}
682+
683+
/**
684+
* Create a new broker connection, picking the first available address from
685+
* the list.
686+
*
687+
* If <a href="http://www.rabbitmq.com/api-guide.html#recovery">automatic connection recovery</a>
688+
* is enabled, the connection returned by this method will be {@link Recoverable}. Future
689+
* reconnection attempts will pick a random accessible address from the provided list.
690+
*
691+
* @param executor thread execution service for consumers on the connection
692+
* @param addr_list a List of known broker addresses (hostname/port pairs) to try in order
693+
* @return an interface to the connection
694+
* @throws java.io.IOException if it encounters a problem
695+
* @see <a href="http://www.rabbitmq.com/api-guide.html#recovery">Automatic Recovery</a>
696+
*/
697+
public Connection newConnection(ExecutorService executor, List<Address> addr_list)
662698
throws IOException, TimeoutException {
663699
// make sure we respect the provided thread factory
664700
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
665701
ConnectionParams params = params(executor);
666702

667703
if (isAutomaticRecoveryEnabled()) {
668704
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
669-
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
705+
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addr_list);
706+
670707
conn.init();
671708
return conn;
672709
} else {
673710
IOException lastException = null;
674-
for (Address addr : addrs) {
711+
for (Address addr : addr_list) {
675712
try {
676713
FrameHandler handler = fhFactory.create(addr);
677714
AMQConnection conn = new AMQConnection(params, handler);
@@ -719,9 +756,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
719756
* @throws IOException if it encounters a problem
720757
*/
721758
public Connection newConnection() throws IOException, TimeoutException {
722-
return newConnection(this.sharedExecutor,
723-
new Address[] {new Address(getHost(), getPort())}
724-
);
759+
return newConnection(this.sharedExecutor, Arrays.asList(new Address(getHost(), getPort())));
725760
}
726761

727762
/**
@@ -736,9 +771,7 @@ public Connection newConnection() throws IOException, TimeoutException {
736771
* @throws IOException if it encounters a problem
737772
*/
738773
public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException {
739-
return newConnection(executor,
740-
new Address[] {new Address(getHost(), getPort())}
741-
);
774+
return newConnection(executor, Arrays.asList(new Address(getHost(), getPort())));
742775
}
743776

744777
@Override public ConnectionFactory clone(){

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
8282
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
8383
}
8484

85+
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addr_list) {
86+
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addr_list);
87+
this.params = params;
88+
89+
this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
90+
}
91+
8592
/**
8693
* Private API.
8794
* @throws IOException

src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,42 +13,46 @@
1313
import java.util.concurrent.TimeoutException;
1414

1515
public class RecoveryAwareAMQConnectionFactory {
16-
private final ConnectionParams params;
17-
private final FrameHandlerFactory factory;
18-
private final Address[] addrs;
19-
20-
public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, Address[] addrs) {
21-
this.params = params;
22-
this.factory = factory;
23-
this.addrs = addrs;
24-
}
16+
private final ConnectionParams params;
17+
private final FrameHandlerFactory factory;
18+
private final List<Address> addrs;
2519

26-
/**
27-
* @return an interface to the connection
28-
* @throws java.io.IOException if it encounters a problem
29-
*/
30-
RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
31-
IOException lastException = null;
32-
Address[] shuffled = shuffle(addrs);
33-
for (Address addr : shuffled) {
34-
try {
35-
FrameHandler frameHandler = factory.create(addr);
36-
RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler);
37-
conn.start();
38-
return conn;
39-
} catch (IOException e) {
40-
lastException = e;
41-
}
42-
}
43-
44-
throw (lastException != null) ? lastException : new IOException("failed to connect");
45-
}
20+
public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, Address[] addrs) {
21+
this.params = params;
22+
this.factory = factory;
23+
this.addrs = Arrays.asList(addrs);
24+
}
25+
26+
public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, List<Address> addrs){
27+
this.params = params;
28+
this.factory = factory;
29+
this.addrs = addrs;
30+
}
31+
/**
32+
* @return an interface to the connection
33+
* @throws java.io.IOException if it encounters a problem
34+
*/
35+
RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
36+
IOException lastException = null;
37+
List<Address> shuffled = shuffle(addrs);
4638

47-
private Address[] shuffle(Address[] addrs) {
48-
List<Address> list = new ArrayList<Address>(Arrays.asList(addrs));
49-
Collections.shuffle(list);
50-
Address[] result = new Address[addrs.length];
51-
list.toArray(result);
52-
return result;
39+
for (Address addr : shuffled) {
40+
try {
41+
FrameHandler frameHandler = factory.create(addr);
42+
RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler);
43+
conn.start();
44+
return conn;
45+
} catch (IOException e) {
46+
lastException = e;
47+
}
5348
}
49+
50+
throw (lastException != null) ? lastException : new IOException("failed to connect");
51+
}
52+
53+
private List<Address> shuffle(List<Address> addrs) {
54+
List<Address> list = new ArrayList<Address>(addrs);
55+
Collections.shuffle(list);
56+
return list;
57+
}
5458
}

test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.rabbitmq.tools.Host;
1212

1313
import java.io.IOException;
14+
import java.util.Arrays;
1415
import java.util.List;
1516
import java.util.ArrayList;
1617
import java.util.UUID;
@@ -35,7 +36,7 @@ public void testConnectionRecoveryWithServerRestart() throws IOException, Interr
3536
assertTrue(connection.isOpen());
3637
}
3738

38-
public void testConnectionRecoveryWithMultipleAddresses()
39+
public void testConnectionRecoveryWithArrayOfAddresses()
3940
throws IOException, InterruptedException, TimeoutException {
4041
final Address[] addresses = {new Address("127.0.0.1"), new Address("127.0.0.1", 5672)};
4142
AutorecoveringConnection c = newRecoveringConnection(addresses);
@@ -49,6 +50,21 @@ public void testConnectionRecoveryWithMultipleAddresses()
4950

5051
}
5152

53+
public void testConnectionRecoveryWithListOfAddresses()
54+
throws IOException, InterruptedException, TimeoutException {
55+
56+
final List<Address> addresses = Arrays.asList(new Address("127.0.0.1"), new Address("127.0.0.1", 5672));
57+
58+
AutorecoveringConnection c = newRecoveringConnection(addresses);
59+
try {
60+
assertTrue(c.isOpen());
61+
closeAndWaitForRecovery(c);
62+
assertTrue(c.isOpen());
63+
} finally {
64+
c.abort();
65+
}
66+
}
67+
5268
public void testConnectionRecoveryWithDisabledTopologyRecovery()
5369
throws IOException, InterruptedException, TimeoutException {
5470
AutorecoveringConnection c = newRecoveringConnection(true);
@@ -705,17 +721,28 @@ private AutorecoveringConnection newRecoveringConnection(boolean disableTopology
705721
return (AutorecoveringConnection) cf.newConnection();
706722
}
707723

724+
private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, Address[] addresses)
725+
throws IOException, TimeoutException {
726+
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
727+
return (AutorecoveringConnection) cf.newConnection(addresses);
728+
}
729+
708730
private AutorecoveringConnection newRecoveringConnection(Address[] addresses)
709731
throws IOException, TimeoutException {
710-
return newRecoveringConnection(false, addresses);
732+
return newRecoveringConnection(false, Arrays.asList(addresses));
711733
}
712734

713-
private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, Address[] addresses)
735+
private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, List<Address> addresses)
714736
throws IOException, TimeoutException {
715737
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
716738
return (AutorecoveringConnection) cf.newConnection(addresses);
717739
}
718740

741+
private AutorecoveringConnection newRecoveringConnection(List<Address> addresses)
742+
throws IOException, TimeoutException {
743+
return newRecoveringConnection(false, addresses);
744+
}
745+
719746
private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) {
720747
ConnectionFactory cf = new ConnectionFactory();
721748
cf.setNetworkRecoveryInterval(RECOVERY_INTERVAL);

test/src/com/rabbitmq/client/test/functional/FrameMax.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,16 @@
1717

1818
package com.rabbitmq.client.test.functional;
1919

20-
import com.rabbitmq.client.impl.ConnectionParams;
20+
import com.rabbitmq.client.*;
21+
import com.rabbitmq.client.impl.*;
2122
import com.rabbitmq.client.test.BrokerTestCase;
2223

2324
import java.io.IOException;
2425
import java.net.Socket;
26+
import java.util.List;
2527
import java.util.concurrent.ExecutorService;
2628
import java.util.concurrent.TimeoutException;
2729

28-
import com.rabbitmq.client.Address;
29-
import com.rabbitmq.client.AMQP;
30-
import com.rabbitmq.client.Connection;
31-
import com.rabbitmq.client.ConnectionFactory;
32-
import com.rabbitmq.client.GetResponse;
33-
import com.rabbitmq.client.impl.AMQConnection;
34-
import com.rabbitmq.client.impl.AMQCommand;
35-
import com.rabbitmq.client.impl.Frame;
36-
import com.rabbitmq.client.impl.FrameHandler;
37-
import com.rabbitmq.client.impl.LongStringHelper;
38-
import com.rabbitmq.client.impl.SocketFrameHandler;
39-
4030
public class FrameMax extends BrokerTestCase {
4131
/* This value for FrameMax is larger than the minimum and less
4232
* than what Rabbit suggests. */
@@ -147,7 +137,7 @@ public GenerousAMQConnection(ConnectionFactory factory,
147137

148138
private static class GenerousConnectionFactory extends ConnectionFactory {
149139

150-
@Override public Connection newConnection(ExecutorService executor, Address[] addrs)
140+
@Override public Connection newConnection(ExecutorService executor, List<Address> addrs)
151141
throws IOException, TimeoutException {
152142
IOException lastException = null;
153143
for (Address addr : addrs) {

0 commit comments

Comments
 (0)