diff --git a/src/main/java/com/rabbitmq/client/RecoverableConnection.java b/src/main/java/com/rabbitmq/client/RecoverableConnection.java new file mode 100644 index 0000000000..2751793d47 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/RecoverableConnection.java @@ -0,0 +1,25 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 1.1 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at http://www.mozilla.org/MPL/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is GoPivotal, Inc. +// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +// + +package com.rabbitmq.client; +/** + * Convenience interface which combines a Connection and Recoverable together. + * + * @see Connection + * @see Recoverable + * + */ +public interface RecoverableConnection extends Connection, Recoverable { } diff --git a/src/main/java/com/rabbitmq/client/RecoverableShutdownSignalException.java b/src/main/java/com/rabbitmq/client/RecoverableShutdownSignalException.java new file mode 100644 index 0000000000..4fd9fcd468 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/RecoverableShutdownSignalException.java @@ -0,0 +1,45 @@ +package com.rabbitmq.client; + +public class RecoverableShutdownSignalException extends ShutdownSignalException { + + /** Default for non-checking. */ + private static final long serialVersionUID = 1L; + + private final boolean _recoveryToBeAttempted; + + public RecoverableShutdownSignalException(boolean hardError, boolean initiatedByApplication, Method reason, + Object ref) { + this(hardError, initiatedByApplication, reason, ref, "", null, false); + } + + /** + * Construct a RecoverableShutdownSignalException from the arguments. + * @param hardError the relevant hard error + * @param initiatedByApplication if the shutdown was client-initiated + * @param reason AMQP method describing the exception reason + * @param ref Reference to Connection or Channel that fired the signal + * @param messagePrefix prefix to add to exception message + */ + public RecoverableShutdownSignalException(boolean hardError, + boolean initiatedByApplication, + Method reason, Object ref, String messagePrefix, Throwable cause) + { + this(hardError, initiatedByApplication, reason, ref, messagePrefix, cause, false); + } + + public RecoverableShutdownSignalException(ShutdownSignalException sse, boolean recoveryToBeAttempted) { + this(sse.isHardError(), sse.isInitiatedByApplication(), sse.getReason(), sse.getReference(), sse.getMessagePrefix(), sse.getCause(), recoveryToBeAttempted); + } + + public RecoverableShutdownSignalException(boolean hardError, + boolean initiatedByApplication, + Method reason, Object ref, String messagePrefix, Throwable cause, boolean recoveryToBeAttempted) + { + super(hardError, initiatedByApplication, reason, ref, messagePrefix, cause); + this._recoveryToBeAttempted = recoveryToBeAttempted; + } + + public boolean isRecoveryToBeAttempted() { + return _recoveryToBeAttempted; + } +} diff --git a/src/main/java/com/rabbitmq/client/ShutdownSignalException.java b/src/main/java/com/rabbitmq/client/ShutdownSignalException.java index d6d757166a..74ef77e945 100644 --- a/src/main/java/com/rabbitmq/client/ShutdownSignalException.java +++ b/src/main/java/com/rabbitmq/client/ShutdownSignalException.java @@ -45,6 +45,8 @@ public class ShutdownSignalException extends RuntimeException implements Sensibl /** Either Channel or Connection instance, depending on _hardError */ private final Object _ref; + + private final String _messagePrefix; /** * Construct a ShutdownSignalException from the arguments. @@ -78,6 +80,7 @@ public ShutdownSignalException(boolean hardError, this._reason = reason; // Depending on hardError what we got is either Connection or Channel reference this._ref = ref; + this._messagePrefix = messagePrefix; } private static String composeMessage(boolean hardError, boolean initiatedByApplication, @@ -112,6 +115,9 @@ private static String composeMessage(boolean hardError, boolean initiatedByAppli /** @return Reference to Connection or Channel object that fired the signal **/ public Object getReference() { return _ref; } + /** @return The original message prefix used to create this exception **/ + protected String getMessagePrefix() { return _messagePrefix; } + public ShutdownSignalException sensibleClone() { try { return (ShutdownSignalException)super.clone(); diff --git a/src/main/java/com/rabbitmq/client/impl/ForwardingShutdownNotifier.java b/src/main/java/com/rabbitmq/client/impl/ForwardingShutdownNotifier.java new file mode 100644 index 0000000000..85e7d0d30d --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/ForwardingShutdownNotifier.java @@ -0,0 +1,104 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 1.1 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at http://www.mozilla.org/MPL/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is GoPivotal, Inc. +// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +// + + +package com.rabbitmq.client.impl; + +import java.util.ArrayList; +import java.util.List; + +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownNotifier; +import com.rabbitmq.client.ShutdownSignalException; + +/** + * A class that manages {@link ShutdownListener}s and remembers the reason for a shutdown. Both + * {@link com.rabbitmq.client.Channel Channel}s and {@link com.rabbitmq.client.Connection + * Connection}s have shutdown listeners. + * + * The ForwardingShutdownNotifier will only notify listeners when notifyListeners is called, + * and may be called multiple times. + * + * This notifier will always report that it is open. + */ +public class ForwardingShutdownNotifier implements ShutdownNotifier { + + /** Monitor for shutdown listeners and shutdownCause */ + private final Object monitor = new Object(); + + /** List of all shutdown listeners associated with the component */ + private final List shutdownListeners = new ArrayList(); + + private volatile ShutdownSignalException lastShutdownCause = null; + + public void addShutdownListener(ShutdownListener listener) + { + synchronized(this.monitor) { + this.shutdownListeners.add(listener); + } + } + + /** + * Returns the last close reason notified to listeners. + */ + public ShutdownSignalException getCloseReason() { + synchronized(this.monitor) { + return this.lastShutdownCause; + } + } + + /** + * + * Do not call this directly, use notifyListeners(cause) + */ + public void notifyListeners() + { + + } + + public void removeShutdownListener(ShutdownListener listener) + { + synchronized(this.monitor) { + this.shutdownListeners.remove(listener); + } + } + + /** Always reports open */ + public boolean isOpen() { + return true; + } + + /** + * Notifies the registered listeners of this cause. + * @param sse + */ + public void notifyListeners(final ShutdownSignalException sse) { + ShutdownListener[] sdls = null; + synchronized(this.monitor) { + sdls = this.shutdownListeners + .toArray(new ShutdownListener[this.shutdownListeners.size()]); + this.lastShutdownCause = sse; + } + for (ShutdownListener l: sdls) { + try { + l.shutdownCompleted(sse); + } catch (Exception e) { + // FIXME: proper logging + } + } + + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index b559625fb5..26f65ddab1 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -7,12 +7,15 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.MissedHeartbeatException; import com.rabbitmq.client.Recoverable; +import com.rabbitmq.client.RecoverableConnection; +import com.rabbitmq.client.RecoverableShutdownSignalException; import com.rabbitmq.client.RecoveryListener; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.TopologyRecoveryException; import com.rabbitmq.client.impl.ConnectionParams; import com.rabbitmq.client.ExceptionHandler; +import com.rabbitmq.client.impl.ForwardingShutdownNotifier; import com.rabbitmq.client.impl.FrameHandlerFactory; import com.rabbitmq.client.impl.NetworkConnection; @@ -56,7 +59,7 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ private final ConnectionParams params; private RecoveryAwareAMQConnection delegate; - private final List shutdownHooks = new ArrayList(); + private final ForwardingShutdownNotifier shutdownHooks = new ForwardingShutdownNotifier(); private final List recoveryListeners = new ArrayList(); private final List blockedListeners = new ArrayList(); @@ -299,16 +302,16 @@ public void close(int closeCode, String closeMessage) throws IOException { * @see Connection#addShutdownListener(com.rabbitmq.client.ShutdownListener) */ public void addShutdownListener(ShutdownListener listener) { - this.shutdownHooks.add(listener); - delegate.addShutdownListener(listener); + this.shutdownHooks.addShutdownListener(listener); + //delegate.addShutdownListener(listener); } /** * @see com.rabbitmq.client.ShutdownNotifier#removeShutdownListener(com.rabbitmq.client.ShutdownListener) */ public void removeShutdownListener(ShutdownListener listener) { - this.shutdownHooks.remove(listener); - delegate.removeShutdownListener(listener); + this.shutdownHooks.removeShutdownListener(listener); + //delegate.removeShutdownListener(listener); } /** @@ -378,8 +381,12 @@ private void addAutomaticRecoveryListener() { final AutorecoveringConnection c = this; ShutdownListener automaticRecoveryListener = new ShutdownListener() { public void shutdownCompleted(ShutdownSignalException cause) { + boolean attemptRecovery = shouldTriggerConnectionRecovery(cause); + //Send a notice to user added hooks. + c.shutdownHooks.notifyListeners(new RecoverableShutdownSignalException(cause, attemptRecovery)); + try { - if (shouldTriggerConnectionRecovery(cause)) { + if (attemptRecovery) { c.beginAutomaticRecovery(); } } catch (Exception e) { @@ -387,12 +394,7 @@ public void shutdownCompleted(ShutdownSignalException cause) { } } }; - synchronized (this) { - if(!this.shutdownHooks.contains(automaticRecoveryListener)) { - this.shutdownHooks.add(automaticRecoveryListener); - } - this.delegate.addShutdownListener(automaticRecoveryListener); - } + this.delegate.addShutdownListener(automaticRecoveryListener); } protected boolean shouldTriggerConnectionRecovery(ShutdownSignalException cause) { @@ -456,9 +458,7 @@ synchronized private void beginAutomaticRecovery() throws InterruptedException, } private void recoverShutdownListeners() { - for (ShutdownListener sh : this.shutdownHooks) { - this.delegate.addShutdownListener(sh); - } + addAutomaticRecoveryListener(); } private void recoverBlockedListeners() {