Skip to content

Commit d366ea7

Browse files
Merge pull request #143 from rabbitmq/rabbitmq-java-client-135
Begin recovery after all shutdown listeners have been given a chance to run
2 parents b8ddb13 + bb252ff commit d366ea7

File tree

4 files changed

+112
-32
lines changed

4 files changed

+112
-32
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
import java.net.InetAddress;
2222
import java.net.SocketException;
2323
import java.net.SocketTimeoutException;
24-
import java.util.Collection;
25-
import java.util.Collections;
26-
import java.util.HashMap;
27-
import java.util.Map;
24+
import java.util.*;
2825
import java.util.concurrent.*;
2926

3027
import com.rabbitmq.client.AMQP;
@@ -45,6 +42,7 @@
4542
import com.rabbitmq.client.SaslMechanism;
4643
import com.rabbitmq.client.ShutdownSignalException;
4744
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
45+
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
4846
import com.rabbitmq.utility.BlockingCell;
4947

5048
final class Copyright {
@@ -65,6 +63,9 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6563
private Thread mainLoopThread;
6664
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
6765

66+
private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
67+
new ArrayList<RecoveryCanBeginListener>();
68+
6869
/**
6970
* Retrieve a copy of the default table of client properties that
7071
* will be sent to the server during connection startup. This
@@ -576,10 +577,31 @@ public void run() {
576577
_frameHandler.close();
577578
_appContinuation.set(null);
578579
notifyListeners();
580+
// assuming that shutdown listeners do not do anything
581+
// asynchronously, e.g. start new threads, this effectively
582+
// guarantees that we only begin recovery when all shutdown
583+
// listeners have executed
584+
notifyRecoveryCanBeginListeners();
579585
}
580586
}
581587
}
582588

589+
private void notifyRecoveryCanBeginListeners() {
590+
ShutdownSignalException sse = this.getCloseReason();
591+
for(RecoveryCanBeginListener fn : this.recoveryCanBeginListeners) {
592+
fn.recoveryCanBegin(sse);
593+
}
594+
}
595+
596+
public void addRecoveryCanBeginListener(RecoveryCanBeginListener fn) {
597+
this.recoveryCanBeginListeners.add(fn);
598+
}
599+
600+
@SuppressWarnings(value = "unused")
601+
public void removeRecoveryCanBeginListener(RecoveryCanBeginListener fn) {
602+
this.recoveryCanBeginListeners.remove(fn);
603+
}
604+
583605
/**
584606
* Called when a frame-read operation times out
585607
* @throws MissedHeartbeatException if heart-beats have been missed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

+26-17
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.rabbitmq.client.ShutdownListener;
1212
import com.rabbitmq.client.ShutdownSignalException;
1313
import com.rabbitmq.client.TopologyRecoveryException;
14+
import com.rabbitmq.client.impl.AMQConnection;
1415
import com.rabbitmq.client.impl.ConnectionParams;
1516
import com.rabbitmq.client.ExceptionHandler;
1617
import com.rabbitmq.client.impl.FrameHandlerFactory;
@@ -254,6 +255,13 @@ public void abort(int timeout) {
254255
delegate.abort(timeout);
255256
}
256257

258+
/**
259+
* Not supposed to be used outside of automated tests.
260+
*/
261+
public AMQConnection getDelegate() {
262+
return delegate;
263+
}
264+
257265
/**
258266
* @see com.rabbitmq.client.Connection#getCloseReason()
259267
*/
@@ -376,8 +384,10 @@ public int getLocalPort() {
376384

377385
private void addAutomaticRecoveryListener() {
378386
final AutorecoveringConnection c = this;
379-
ShutdownListener automaticRecoveryListener = new ShutdownListener() {
380-
public void shutdownCompleted(ShutdownSignalException cause) {
387+
// this listener will run after shutdown listeners,
388+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
389+
RecoveryCanBeginListener starter = new RecoveryCanBeginListener() {
390+
public void recoveryCanBegin(ShutdownSignalException cause) {
381391
try {
382392
if (shouldTriggerConnectionRecovery(cause)) {
383393
c.beginAutomaticRecovery();
@@ -388,10 +398,7 @@ public void shutdownCompleted(ShutdownSignalException cause) {
388398
}
389399
};
390400
synchronized (this) {
391-
if(!this.shutdownHooks.contains(automaticRecoveryListener)) {
392-
this.shutdownHooks.add(automaticRecoveryListener);
393-
}
394-
this.delegate.addShutdownListener(automaticRecoveryListener);
401+
this.delegate.addRecoveryCanBeginListener(starter);
395402
}
396403
}
397404

@@ -441,18 +448,20 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
441448

442449
synchronized private void beginAutomaticRecovery() throws InterruptedException, IOException, TopologyRecoveryException {
443450
Thread.sleep(this.params.getNetworkRecoveryInterval());
444-
if (!this.recoverConnection())
445-
return;
446-
447-
this.recoverShutdownListeners();
448-
this.recoverBlockedListeners();
449-
this.recoverChannels();
450-
if(this.params.isTopologyRecoveryEnabled()) {
451-
this.recoverEntities();
452-
this.recoverConsumers();
453-
}
451+
if (!this.recoverConnection()) {
452+
return;
453+
}
454454

455-
this.notifyRecoveryListeners();
455+
this.addAutomaticRecoveryListener();
456+
this.recoverShutdownListeners();
457+
this.recoverBlockedListeners();
458+
this.recoverChannels();
459+
if(this.params.isTopologyRecoveryEnabled()) {
460+
this.recoverEntities();
461+
this.recoverConsumers();
462+
}
463+
464+
this.notifyRecoveryListeners();
456465
}
457466

458467
private void recoverShutdownListeners() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.rabbitmq.client.impl.recovery;
2+
3+
import com.rabbitmq.client.ShutdownSignalException;
4+
5+
/**
6+
* Used internally to indicate when connection recovery can
7+
* begin. See {@link https://github.com/rabbitmq/rabbitmq-java-client/issues/135}.
8+
* This is package-local by design.
9+
*/
10+
public interface RecoveryCanBeginListener {
11+
void recoveryCanBegin(ShutdownSignalException cause);
12+
}

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

+48-11
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package com.rabbitmq.client.test.functional;
22

33
import com.rabbitmq.client.*;
4-
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
5-
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
4+
import com.rabbitmq.client.impl.AMQConnection;
5+
import com.rabbitmq.client.impl.recovery.*;
66
import com.rabbitmq.client.Recoverable;
77
import com.rabbitmq.client.RecoveryListener;
8-
import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener;
9-
import com.rabbitmq.client.impl.recovery.QueueRecoveryListener;
108
import com.rabbitmq.client.test.BrokerTestCase;
119
import com.rabbitmq.tools.Host;
1210

@@ -21,8 +19,9 @@
2119
import java.util.concurrent.atomic.AtomicInteger;
2220
import java.util.concurrent.atomic.AtomicReference;
2321

22+
@SuppressWarnings("ThrowFromFinallyBlock")
2423
public class ConnectionRecovery extends BrokerTestCase {
25-
public static final long RECOVERY_INTERVAL = 2000;
24+
private static final long RECOVERY_INTERVAL = 2000;
2625

2726
public void testConnectionRecovery() throws IOException, InterruptedException {
2827
assertTrue(connection.isOpen());
@@ -89,6 +88,44 @@ public void testConnectionRecoveryWithDisabledTopologyRecovery()
8988
}
9089
}
9190

91+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
92+
public void testThatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException {
93+
final List<String> events = new ArrayList<String>();
94+
final CountDownLatch latch = new CountDownLatch(1);
95+
connection.addShutdownListener(new ShutdownListener() {
96+
public void shutdownCompleted(ShutdownSignalException cause) {
97+
events.add("shutdown hook 1");
98+
}
99+
});
100+
connection.addShutdownListener(new ShutdownListener() {
101+
public void shutdownCompleted(ShutdownSignalException cause) {
102+
events.add("shutdown hook 2");
103+
}
104+
});
105+
// note: we do not want to expose RecoveryCanBeginListener so this
106+
// test does not use it
107+
((AutorecoveringConnection)connection).getDelegate().addRecoveryCanBeginListener(new RecoveryCanBeginListener() {
108+
@Override
109+
public void recoveryCanBegin(ShutdownSignalException cause) {
110+
events.add("recovery start hook 1");
111+
}
112+
});
113+
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
114+
@Override
115+
public void handleRecovery(Recoverable recoverable) {
116+
latch.countDown();
117+
}
118+
});
119+
assertTrue(connection.isOpen());
120+
closeAndWaitForRecovery();
121+
assertTrue(connection.isOpen());
122+
assertEquals("shutdown hook 1", events.get(0));
123+
assertEquals("shutdown hook 2", events.get(1));
124+
assertEquals("recovery start hook 1", events.get(2));
125+
connection.close();
126+
wait(latch);
127+
}
128+
92129
public void testShutdownHooksRecoveryOnConnection() throws IOException, InterruptedException {
93130
final CountDownLatch latch = new CountDownLatch(2);
94131
connection.addShutdownListener(new ShutdownListener() {
@@ -211,7 +248,7 @@ public void testClientNamedQueueRecoveryWithNoWait() throws IOException, Interru
211248
testClientNamedQueueRecoveryWith("java-client.test.recovery.q1-nowait", true);
212249
}
213250

214-
protected void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws IOException, InterruptedException, TimeoutException {
251+
private void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws IOException, InterruptedException, TimeoutException {
215252
Channel ch = connection.createChannel();
216253
if(noWait) {
217254
declareClientNamedQueueNoWait(ch, q);
@@ -750,20 +787,20 @@ private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disa
750787
}
751788

752789
private static void wait(CountDownLatch latch) throws InterruptedException {
753-
// Very very generous amount of time to wait, just make sure we never
754-
// hang forever
755-
assertTrue(latch.await(1800, TimeUnit.SECONDS));
790+
// we want to wait for recovery to complete for a reasonable amount of time
791+
// but still make recovery failures easy to notice in development environments
792+
assertTrue(latch.await(90, TimeUnit.SECONDS));
756793
}
757794

758795
private void waitForConfirms(Channel ch) throws InterruptedException, TimeoutException {
759796
ch.waitForConfirms(30 * 60 * 1000);
760797
}
761798

762-
protected void assertRecordedQueues(Connection conn, int size) {
799+
private void assertRecordedQueues(Connection conn, int size) {
763800
assertEquals(size, ((AutorecoveringConnection)conn).getRecordedQueues().size());
764801
}
765802

766-
protected void assertRecordedExchanges(Connection conn, int size) {
803+
private void assertRecordedExchanges(Connection conn, int size) {
767804
assertEquals(size, ((AutorecoveringConnection)conn).getRecordedExchanges().size());
768805
}
769806
}

0 commit comments

Comments
 (0)