Update amqp-client to 5.10.0 Change ShutdownListeners to lambda expressions. https://github.com/rabbitmq/rabbitmq-java-client/releases/tag/v5.0.0 Instead of setting channel and connection to null in ShutdownListeners, trust ShutdownNotifierComponent#isOpen() that returns false if there is a shutdown-cause. Change-Id: Ia8304b775fb470613b4ae40210a81f6b1ffb55f9
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl index 0af44ef..2827eee 100644 --- a/external_plugin_deps.bzl +++ b/external_plugin_deps.bzl
@@ -3,6 +3,6 @@ def external_plugin_deps(): maven_jar( name = "amqp_client", - artifact = "com.rabbitmq:amqp-client:4.1.1", - sha1 = "256f6c92c55a8d3cfae8d32e1a15713baedab184", + artifact = "com.rabbitmq:amqp-client:5.10.0", + sha1 = "4de351467a13b8ca4eb7e8023032f9f964a21796", )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java index c77ed05..1d3c4f0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java +++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/session/type/AMQPSession.java
@@ -26,8 +26,6 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.ShutdownListener; -import com.rabbitmq.client.ShutdownNotifier; import com.rabbitmq.client.ShutdownSignalException; import java.io.IOException; import java.net.URISyntaxException; @@ -41,58 +39,11 @@ public final class AMQPSession implements Session { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - private class ShutdownListenerImpl implements ShutdownListener { - - private final Class<?> clazz; - - <T extends ShutdownNotifier> ShutdownListenerImpl(Class<T> clazz) { - this.clazz = clazz; - } - - @Override - public void shutdownCompleted(ShutdownSignalException cause) { - if (cause != null) { - Object obj = cause.getReference(); - if (Channel.class.isInstance(obj)) { - Channel.class.cast(obj).removeShutdownListener(this); - } else if (Connection.class.isInstance(obj)) { - Connection.class.cast(obj).removeShutdownListener(this); - } - if (clazz.isInstance(obj)) { - if (clazz == Channel.class) { - Channel ch = Channel.class.cast(obj); - if (cause.isInitiatedByApplication()) { - logger.atInfo().log(MSG("Channel #%d closed by application."), ch.getChannelNumber()); - } else { - logger.atWarning().log( - MSG("Channel #%dclosed. Cause: %s"), ch.getChannelNumber(), cause.getMessage()); - } - if (ch.equals(AMQPSession.this.channel)) { - AMQPSession.this.channel = null; - } - } else if (clazz == Connection.class) { - Connection conn = Connection.class.cast(obj); - if (cause.isInitiatedByApplication()) { - logger.atInfo().log(MSG("Connection closed by application.")); - } else { - logger.atWarning().log(MSG("Connection closed. Cause: %s"), cause.getMessage()); - } - if (conn.equals(AMQPSession.this.connection)) { - AMQPSession.this.connection = null; - } - } - } - } - } - } - private final Properties properties; private volatile Connection connection; private volatile Channel channel; private final AtomicInteger failureCount = new AtomicInteger(0); - private final ShutdownListener connectionListener = new ShutdownListenerImpl(Connection.class); - private final ShutdownListener channelListener = new ShutdownListenerImpl(Channel.class); public AMQPSession(Properties properties) { this.properties = properties; @@ -104,22 +55,31 @@ @Override public boolean isOpen() { - if (connection != null) { + if (connection != null && connection.isOpen()) { return true; } return false; } private Channel getChannel() { - Channel ch = null; - if (connection == null) { + if (!isOpen()) { connect(); } else { try { - ch = connection.createChannel(); - ch.addShutdownListener(channelListener); + Channel ch = connection.createChannel(); + int channelId = ch.getChannelNumber(); + ch.addShutdownListener( + cause -> { + if (cause.isInitiatedByApplication()) { + logger.atInfo().log(MSG("Channel #%d closed by application."), channelId); + } else { + logger.atWarning().log( + MSG("Channel #%d closed. Cause: %s"), channelId, cause.getMessage()); + } + }); failureCount.set(0); - logger.atInfo().log(MSG("Channel #%d opened."), ch.getChannelNumber()); + logger.atInfo().log(MSG("Channel #%d opened."), channelId); + return ch; } catch (IOException | AlreadyClosedException ex) { logger.atSevere().withCause(ex).log(MSG("Failed to open channel.")); failureCount.incrementAndGet(); @@ -130,12 +90,12 @@ disconnect(); } } - return ch; + return null; } @Override public boolean connect() { - if (connection != null && connection.isOpen()) { + if (isOpen()) { logger.atInfo().log(MSG("Already connected.")); return true; } @@ -156,7 +116,14 @@ factory.setPassword(amqp.password); } connection = factory.newConnection(); - connection.addShutdownListener(connectionListener); + connection.addShutdownListener( + cause -> { + if (cause.isInitiatedByApplication()) { + logger.atInfo().log(MSG("Connection closed by application.")); + } else { + logger.atWarning().log(MSG("Connection closed. Cause: %s"), cause.getMessage()); + } + }); logger.atInfo().log(MSG("Connection established.")); return true; }