diff --git a/build.xml b/build.xml
index 0b5cc50339..b0e0fe931f 100644
--- a/build.xml
+++ b/build.xml
@@ -87,7 +87,8 @@
- automatic connection recovery
+ * is enabled, the connection returned by this method will be {@link Recoverable}. Future
+ * reconnection attempts will pick a random accessible address from the provided list.
+ *
+ * @param addr_list a List of known broker addresses (hostname/port pairs) to try in order
+ * @return an interface to the connection
+ * @throws IOException if it encounters a problem
+ */
+ public Connection newConnection(List addr_list) throws IOException, TimeoutException {
+ return newConnection(this.sharedExecutor, addr_list);
}
/**
@@ -658,7 +676,25 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce
* @throws java.io.IOException if it encounters a problem
* @see Automatic Recovery
*/
- public Connection newConnection(ExecutorService executor, Address[] addrs)
+ public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException {
+ return newConnection(executor, Arrays.asList(addrs));
+ }
+
+ /**
+ * Create a new broker connection, picking the first available address from
+ * the list.
+ *
+ * If automatic connection recovery
+ * is enabled, the connection returned by this method will be {@link Recoverable}. Future
+ * reconnection attempts will pick a random accessible address from the provided list.
+ *
+ * @param executor thread execution service for consumers on the connection
+ * @param addr_list a List of known broker addresses (hostname/port pairs) to try in order
+ * @return an interface to the connection
+ * @throws java.io.IOException if it encounters a problem
+ * @see Automatic Recovery
+ */
+ public Connection newConnection(ExecutorService executor, List addr_list)
throws IOException, TimeoutException {
// make sure we respect the provided thread factory
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
@@ -666,12 +702,13 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
- AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
+ AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addr_list);
+
conn.init();
return conn;
} else {
IOException lastException = null;
- for (Address addr : addrs) {
+ for (Address addr : addr_list) {
try {
FrameHandler handler = fhFactory.create(addr);
AMQConnection conn = new AMQConnection(params, handler);
@@ -719,9 +756,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
* @throws IOException if it encounters a problem
*/
public Connection newConnection() throws IOException, TimeoutException {
- return newConnection(this.sharedExecutor,
- new Address[] {new Address(getHost(), getPort())}
- );
+ return newConnection(this.sharedExecutor, Arrays.asList(new Address(getHost(), getPort())));
}
/**
@@ -736,9 +771,7 @@ public Connection newConnection() throws IOException, TimeoutException {
* @throws IOException if it encounters a problem
*/
public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException {
- return newConnection(executor,
- new Address[] {new Address(getHost(), getPort())}
- );
+ return newConnection(executor, Arrays.asList(new Address(getHost(), getPort())));
}
@Override public ConnectionFactory clone(){
diff --git a/src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
index 145b3aa14c..9fc5dd0da0 100644
--- a/src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
+++ b/src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java
@@ -82,6 +82,13 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
this.channels = new ConcurrentHashMap();
}
+ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List addr_list) {
+ this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addr_list);
+ this.params = params;
+
+ this.channels = new ConcurrentHashMap();
+ }
+
/**
* Private API.
* @throws IOException
diff --git a/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java b/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java
index 4910d1ff5e..99fc17bf55 100644
--- a/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java
+++ b/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java
@@ -13,42 +13,46 @@
import java.util.concurrent.TimeoutException;
public class RecoveryAwareAMQConnectionFactory {
- private final ConnectionParams params;
- private final FrameHandlerFactory factory;
- private final Address[] addrs;
-
- public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, Address[] addrs) {
- this.params = params;
- this.factory = factory;
- this.addrs = addrs;
- }
+ private final ConnectionParams params;
+ private final FrameHandlerFactory factory;
+ private final List addrs;
- /**
- * @return an interface to the connection
- * @throws java.io.IOException if it encounters a problem
- */
- RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
- IOException lastException = null;
- Address[] shuffled = shuffle(addrs);
- for (Address addr : shuffled) {
- try {
- FrameHandler frameHandler = factory.create(addr);
- RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler);
- conn.start();
- return conn;
- } catch (IOException e) {
- lastException = e;
- }
- }
-
- throw (lastException != null) ? lastException : new IOException("failed to connect");
- }
+ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, Address[] addrs) {
+ this.params = params;
+ this.factory = factory;
+ this.addrs = Arrays.asList(addrs);
+ }
+
+ public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, List addrs){
+ this.params = params;
+ this.factory = factory;
+ this.addrs = addrs;
+ }
+ /**
+ * @return an interface to the connection
+ * @throws java.io.IOException if it encounters a problem
+ */
+ RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException {
+ IOException lastException = null;
+ List shuffled = shuffle(addrs);
- private Address[] shuffle(Address[] addrs) {
- List list = new ArrayList(Arrays.asList(addrs));
- Collections.shuffle(list);
- Address[] result = new Address[addrs.length];
- list.toArray(result);
- return result;
+ for (Address addr : shuffled) {
+ try {
+ FrameHandler frameHandler = factory.create(addr);
+ RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler);
+ conn.start();
+ return conn;
+ } catch (IOException e) {
+ lastException = e;
+ }
}
+
+ throw (lastException != null) ? lastException : new IOException("failed to connect");
+ }
+
+ private List shuffle(List addrs) {
+ List list = new ArrayList(addrs);
+ Collections.shuffle(list);
+ return list;
+ }
}
diff --git a/test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java
index c310c81c1b..68dcb4f1a2 100644
--- a/test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java
+++ b/test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java
@@ -11,6 +11,7 @@
import com.rabbitmq.tools.Host;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.UUID;
@@ -35,7 +36,7 @@ public void testConnectionRecoveryWithServerRestart() throws IOException, Interr
assertTrue(connection.isOpen());
}
- public void testConnectionRecoveryWithMultipleAddresses()
+ public void testConnectionRecoveryWithArrayOfAddresses()
throws IOException, InterruptedException, TimeoutException {
final Address[] addresses = {new Address("127.0.0.1"), new Address("127.0.0.1", 5672)};
AutorecoveringConnection c = newRecoveringConnection(addresses);
@@ -49,6 +50,21 @@ public void testConnectionRecoveryWithMultipleAddresses()
}
+ public void testConnectionRecoveryWithListOfAddresses()
+ throws IOException, InterruptedException, TimeoutException {
+
+ final List addresses = Arrays.asList(new Address("127.0.0.1"), new Address("127.0.0.1", 5672));
+
+ AutorecoveringConnection c = newRecoveringConnection(addresses);
+ try {
+ assertTrue(c.isOpen());
+ closeAndWaitForRecovery(c);
+ assertTrue(c.isOpen());
+ } finally {
+ c.abort();
+ }
+ }
+
public void testConnectionRecoveryWithDisabledTopologyRecovery()
throws IOException, InterruptedException, TimeoutException {
AutorecoveringConnection c = newRecoveringConnection(true);
@@ -705,17 +721,28 @@ private AutorecoveringConnection newRecoveringConnection(boolean disableTopology
return (AutorecoveringConnection) cf.newConnection();
}
+ private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, Address[] addresses)
+ throws IOException, TimeoutException {
+ ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
+ return (AutorecoveringConnection) cf.newConnection(addresses);
+ }
+
private AutorecoveringConnection newRecoveringConnection(Address[] addresses)
throws IOException, TimeoutException {
- return newRecoveringConnection(false, addresses);
+ return newRecoveringConnection(false, Arrays.asList(addresses));
}
- private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, Address[] addresses)
+ private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, List addresses)
throws IOException, TimeoutException {
ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery);
return (AutorecoveringConnection) cf.newConnection(addresses);
}
+ private AutorecoveringConnection newRecoveringConnection(List addresses)
+ throws IOException, TimeoutException {
+ return newRecoveringConnection(false, addresses);
+ }
+
private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) {
ConnectionFactory cf = new ConnectionFactory();
cf.setNetworkRecoveryInterval(RECOVERY_INTERVAL);
diff --git a/test/src/com/rabbitmq/client/test/functional/FrameMax.java b/test/src/com/rabbitmq/client/test/functional/FrameMax.java
index d6f31fc5db..9a388a93ca 100644
--- a/test/src/com/rabbitmq/client/test/functional/FrameMax.java
+++ b/test/src/com/rabbitmq/client/test/functional/FrameMax.java
@@ -17,26 +17,16 @@
package com.rabbitmq.client.test.functional;
-import com.rabbitmq.client.impl.ConnectionParams;
+import com.rabbitmq.client.*;
+import com.rabbitmq.client.impl.*;
import com.rabbitmq.client.test.BrokerTestCase;
import java.io.IOException;
import java.net.Socket;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
-import com.rabbitmq.client.Address;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.GetResponse;
-import com.rabbitmq.client.impl.AMQConnection;
-import com.rabbitmq.client.impl.AMQCommand;
-import com.rabbitmq.client.impl.Frame;
-import com.rabbitmq.client.impl.FrameHandler;
-import com.rabbitmq.client.impl.LongStringHelper;
-import com.rabbitmq.client.impl.SocketFrameHandler;
-
public class FrameMax extends BrokerTestCase {
/* This value for FrameMax is larger than the minimum and less
* than what Rabbit suggests. */
@@ -147,7 +137,7 @@ public GenerousAMQConnection(ConnectionFactory factory,
private static class GenerousConnectionFactory extends ConnectionFactory {
- @Override public Connection newConnection(ExecutorService executor, Address[] addrs)
+ @Override public Connection newConnection(ExecutorService executor, List addrs)
throws IOException, TimeoutException {
IOException lastException = null;
for (Address addr : addrs) {