JAMES-2551 SimpleChannelPool should provide resilient channels & connections

Before that, SimpleChannelPool has been failed in HealthCheckTest. RabbitMQ 
start and stop
causes single final Connection & Channel being closed and never getting 
recovered, use AtomicReference
to check whether connection or channel should be re-new if they are closed


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0e8dd583
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0e8dd583
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0e8dd583

Branch: refs/heads/master
Commit: 0e8dd5831cf31c77ae0a1b22947b00c99e898cbb
Parents: 6f76f9f
Author: duc <[email protected]>
Authored: Thu Oct 11 13:55:04 2018 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Thu Oct 25 15:07:52 2018 +0700

----------------------------------------------------------------------
 .../backend/rabbitmq/SimpleChannelPool.java     | 59 +++++++++++++++-----
 1 file changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0e8dd583/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
index e887f72..31d8530 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java
@@ -19,37 +19,66 @@
 
 package org.apache.james.backend.rabbitmq;
 
-import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 
+import javax.annotation.PreDestroy;
+
+import com.github.fge.lambdas.Throwing;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
 public class SimpleChannelPool implements RabbitMQChannelPool {
-    private final Channel channel;
-    private final Connection connection;
+    private final AtomicReference<Channel> channelReference;
+    private final AtomicReference<Connection> connectionReference;
+    private final RabbitMQConnectionFactory connectionFactory;
 
-    public SimpleChannelPool(RabbitMQConnectionFactory factory) throws 
IOException {
-        this.connection = factory.create();
-        this.channel = connection.createChannel();
+    public SimpleChannelPool(RabbitMQConnectionFactory factory) {
+        this.connectionFactory = factory;
+        this.connectionReference = new AtomicReference<>();
+        this.channelReference = new AtomicReference<>();
     }
 
     @Override
     public synchronized  <T, E extends Throwable> T execute(RabbitFunction<T, 
E> f) throws E, ConnectionFailedException {
-        return f.execute(channel);
+        return f.execute(getResilientChannel());
     }
 
     @Override
     public synchronized  <E extends Throwable> void execute(RabbitConsumer<E> 
f) throws E, ConnectionFailedException {
-        f.execute(channel);
+        f.execute(getResilientChannel());
     }
 
+    @PreDestroy
     @Override
-    public void close() throws Exception {
-        if (channel.isOpen()) {
-            channel.close();
-        }
-        if (connection.isOpen()) {
-            connection.close();
-        }
+    public synchronized void close() {
+        Optional.ofNullable(channelReference.get())
+            .filter(Channel::isOpen)
+            
.ifPresent(Throwing.<Channel>consumer(Channel::close).sneakyThrow());
+
+        Optional.ofNullable(connectionReference.get())
+            .filter(Connection::isOpen)
+            
.ifPresent(Throwing.<Connection>consumer(Connection::close).sneakyThrow());
+    }
+
+    private Connection getResilientConnection() {
+        return connectionReference.updateAndGet(this::getOpenConnection);
+    }
+
+    private Connection getOpenConnection(Connection checkedConnection) {
+        return Optional.ofNullable(checkedConnection)
+            .filter(Connection::isOpen)
+            .orElseGet(connectionFactory::create);
+    }
+
+    private Channel getResilientChannel() {
+        return 
channelReference.updateAndGet(Throwing.unaryOperator(this::getOpenChannel));
+    }
+
+    private Channel getOpenChannel(Channel checkedChannel) {
+        return Optional.ofNullable(checkedChannel)
+            .filter(Channel::isOpen)
+            .orElseGet(Throwing.supplier(() -> 
getResilientConnection().createChannel())
+                .sneakyThrow());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to