diff --git a/build.xml b/build.xml index 0b5cc50339..b0e0fe931f 100644 --- a/build.xml +++ b/build.xml @@ -87,7 +87,8 @@ - automatic connection recovery + * is enabled, the connection returned by this method will be {@link Recoverable}. Future + * reconnection attempts will pick a random accessible address from the provided list. + * + * @param addr_list a List of known broker addresses (hostname/port pairs) to try in order + * @return an interface to the connection + * @throws IOException if it encounters a problem + */ + public Connection newConnection(List
addr_list) throws IOException, TimeoutException { + return newConnection(this.sharedExecutor, addr_list); } /** @@ -658,7 +676,25 @@ public Connection newConnection(Address[] addrs) throws IOException, TimeoutExce * @throws java.io.IOException if it encounters a problem * @see Automatic Recovery */ - public Connection newConnection(ExecutorService executor, Address[] addrs) + public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException { + return newConnection(executor, Arrays.asList(addrs)); + } + + /** + * Create a new broker connection, picking the first available address from + * the list. + * + * If automatic connection recovery + * is enabled, the connection returned by this method will be {@link Recoverable}. Future + * reconnection attempts will pick a random accessible address from the provided list. + * + * @param executor thread execution service for consumers on the connection + * @param addr_list a List of known broker addresses (hostname/port pairs) to try in order + * @return an interface to the connection + * @throws java.io.IOException if it encounters a problem + * @see Automatic Recovery + */ + public Connection newConnection(ExecutorService executor, List
addr_list) throws IOException, TimeoutException { // make sure we respect the provided thread factory FrameHandlerFactory fhFactory = createFrameHandlerFactory(); @@ -666,12 +702,13 @@ public Connection newConnection(ExecutorService executor, Address[] addrs) if (isAutomaticRecoveryEnabled()) { // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection - AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs); + AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addr_list); + conn.init(); return conn; } else { IOException lastException = null; - for (Address addr : addrs) { + for (Address addr : addr_list) { try { FrameHandler handler = fhFactory.create(addr); AMQConnection conn = new AMQConnection(params, handler); @@ -719,9 +756,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { * @throws IOException if it encounters a problem */ public Connection newConnection() throws IOException, TimeoutException { - return newConnection(this.sharedExecutor, - new Address[] {new Address(getHost(), getPort())} - ); + return newConnection(this.sharedExecutor, Arrays.asList(new Address(getHost(), getPort()))); } /** @@ -736,9 +771,7 @@ public Connection newConnection() throws IOException, TimeoutException { * @throws IOException if it encounters a problem */ public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException { - return newConnection(executor, - new Address[] {new Address(getHost(), getPort())} - ); + return newConnection(executor, Arrays.asList(new Address(getHost(), getPort()))); } @Override public ConnectionFactory clone(){ diff --git a/src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 145b3aa14c..9fc5dd0da0 100644 --- a/src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -82,6 +82,13 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, this.channels = new ConcurrentHashMap(); } + public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List
addr_list) { + this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addr_list); + this.params = params; + + this.channels = new ConcurrentHashMap(); + } + /** * Private API. * @throws IOException diff --git a/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java b/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java index 4910d1ff5e..99fc17bf55 100644 --- a/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java +++ b/src/com/rabbitmq/client/impl/recovery/RecoveryAwareAMQConnectionFactory.java @@ -13,42 +13,46 @@ import java.util.concurrent.TimeoutException; public class RecoveryAwareAMQConnectionFactory { - private final ConnectionParams params; - private final FrameHandlerFactory factory; - private final Address[] addrs; - - public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, Address[] addrs) { - this.params = params; - this.factory = factory; - this.addrs = addrs; - } + private final ConnectionParams params; + private final FrameHandlerFactory factory; + private final List
addrs; - /** - * @return an interface to the connection - * @throws java.io.IOException if it encounters a problem - */ - RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException { - IOException lastException = null; - Address[] shuffled = shuffle(addrs); - for (Address addr : shuffled) { - try { - FrameHandler frameHandler = factory.create(addr); - RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler); - conn.start(); - return conn; - } catch (IOException e) { - lastException = e; - } - } - - throw (lastException != null) ? lastException : new IOException("failed to connect"); - } + public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, Address[] addrs) { + this.params = params; + this.factory = factory; + this.addrs = Arrays.asList(addrs); + } + + public RecoveryAwareAMQConnectionFactory(ConnectionParams params, FrameHandlerFactory factory, List
addrs){ + this.params = params; + this.factory = factory; + this.addrs = addrs; + } + /** + * @return an interface to the connection + * @throws java.io.IOException if it encounters a problem + */ + RecoveryAwareAMQConnection newConnection() throws IOException, TimeoutException { + IOException lastException = null; + List
shuffled = shuffle(addrs); - private Address[] shuffle(Address[] addrs) { - List
list = new ArrayList
(Arrays.asList(addrs)); - Collections.shuffle(list); - Address[] result = new Address[addrs.length]; - list.toArray(result); - return result; + for (Address addr : shuffled) { + try { + FrameHandler frameHandler = factory.create(addr); + RecoveryAwareAMQConnection conn = new RecoveryAwareAMQConnection(params, frameHandler); + conn.start(); + return conn; + } catch (IOException e) { + lastException = e; + } } + + throw (lastException != null) ? lastException : new IOException("failed to connect"); + } + + private List
shuffle(List
addrs) { + List
list = new ArrayList
(addrs); + Collections.shuffle(list); + return list; + } } diff --git a/test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java index c310c81c1b..68dcb4f1a2 100644 --- a/test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/test/src/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -11,6 +11,7 @@ import com.rabbitmq.tools.Host; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.ArrayList; import java.util.UUID; @@ -35,7 +36,7 @@ public void testConnectionRecoveryWithServerRestart() throws IOException, Interr assertTrue(connection.isOpen()); } - public void testConnectionRecoveryWithMultipleAddresses() + public void testConnectionRecoveryWithArrayOfAddresses() throws IOException, InterruptedException, TimeoutException { final Address[] addresses = {new Address("127.0.0.1"), new Address("127.0.0.1", 5672)}; AutorecoveringConnection c = newRecoveringConnection(addresses); @@ -49,6 +50,21 @@ public void testConnectionRecoveryWithMultipleAddresses() } + public void testConnectionRecoveryWithListOfAddresses() + throws IOException, InterruptedException, TimeoutException { + + final List
addresses = Arrays.asList(new Address("127.0.0.1"), new Address("127.0.0.1", 5672)); + + AutorecoveringConnection c = newRecoveringConnection(addresses); + try { + assertTrue(c.isOpen()); + closeAndWaitForRecovery(c); + assertTrue(c.isOpen()); + } finally { + c.abort(); + } + } + public void testConnectionRecoveryWithDisabledTopologyRecovery() throws IOException, InterruptedException, TimeoutException { AutorecoveringConnection c = newRecoveringConnection(true); @@ -705,17 +721,28 @@ private AutorecoveringConnection newRecoveringConnection(boolean disableTopology return (AutorecoveringConnection) cf.newConnection(); } + private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, Address[] addresses) + throws IOException, TimeoutException { + ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); + return (AutorecoveringConnection) cf.newConnection(addresses); + } + private AutorecoveringConnection newRecoveringConnection(Address[] addresses) throws IOException, TimeoutException { - return newRecoveringConnection(false, addresses); + return newRecoveringConnection(false, Arrays.asList(addresses)); } - private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, Address[] addresses) + private AutorecoveringConnection newRecoveringConnection(boolean disableTopologyRecovery, List
addresses) throws IOException, TimeoutException { ConnectionFactory cf = buildConnectionFactoryWithRecoveryEnabled(disableTopologyRecovery); return (AutorecoveringConnection) cf.newConnection(addresses); } + private AutorecoveringConnection newRecoveringConnection(List
addresses) + throws IOException, TimeoutException { + return newRecoveringConnection(false, addresses); + } + private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disableTopologyRecovery) { ConnectionFactory cf = new ConnectionFactory(); cf.setNetworkRecoveryInterval(RECOVERY_INTERVAL); diff --git a/test/src/com/rabbitmq/client/test/functional/FrameMax.java b/test/src/com/rabbitmq/client/test/functional/FrameMax.java index d6f31fc5db..9a388a93ca 100644 --- a/test/src/com/rabbitmq/client/test/functional/FrameMax.java +++ b/test/src/com/rabbitmq/client/test/functional/FrameMax.java @@ -17,26 +17,16 @@ package com.rabbitmq.client.test.functional; -import com.rabbitmq.client.impl.ConnectionParams; +import com.rabbitmq.client.*; +import com.rabbitmq.client.impl.*; import com.rabbitmq.client.test.BrokerTestCase; import java.io.IOException; import java.net.Socket; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.GetResponse; -import com.rabbitmq.client.impl.AMQConnection; -import com.rabbitmq.client.impl.AMQCommand; -import com.rabbitmq.client.impl.Frame; -import com.rabbitmq.client.impl.FrameHandler; -import com.rabbitmq.client.impl.LongStringHelper; -import com.rabbitmq.client.impl.SocketFrameHandler; - public class FrameMax extends BrokerTestCase { /* This value for FrameMax is larger than the minimum and less * than what Rabbit suggests. */ @@ -147,7 +137,7 @@ public GenerousAMQConnection(ConnectionFactory factory, private static class GenerousConnectionFactory extends ConnectionFactory { - @Override public Connection newConnection(ExecutorService executor, Address[] addrs) + @Override public Connection newConnection(ExecutorService executor, List
addrs) throws IOException, TimeoutException { IOException lastException = null; for (Address addr : addrs) {