JAMES-2545 implement a channel pool for rabbitmq cnx
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3313b53d Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3313b53d Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3313b53d Branch: refs/heads/master Commit: 3313b53d978e5ae52cccd725258bb9804bd4aeee Parents: 01738f6 Author: Matthieu Baechler <[email protected]> Authored: Mon Sep 10 18:11:41 2018 +0200 Committer: Benoit Tellier <[email protected]> Committed: Fri Sep 14 10:17:42 2018 +0700 ---------------------------------------------------------------------- backends-common/rabbitmq/pom.xml | 5 + .../backend/rabbitmq/RabbitChannelPool.java | 103 +++++++++++++++++++ .../james/queue/rabbitmq/RabbitClient.java | 43 ++++---- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 3 +- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 3 +- 5 files changed, 137 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/backends-common/rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/pom.xml b/backends-common/rabbitmq/pom.xml index d518753..9583901 100644 --- a/backends-common/rabbitmq/pom.xml +++ b/backends-common/rabbitmq/pom.xml @@ -63,6 +63,11 @@ <artifactId>commons-configuration</artifactId> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-pool2</artifactId> + <version>2.6.0</version> + </dependency> + <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java new file mode 100644 index 0000000..c4a9fe6 --- /dev/null +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPool.java @@ -0,0 +1,103 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backend.rabbitmq; + +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.ObjectPool; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericObjectPool; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; + +public class RabbitChannelPool { + + private static class ChannelBasePooledObjectFactory extends BasePooledObjectFactory<Channel> { + private final Connection connection; + + public ChannelBasePooledObjectFactory(Connection connection) { + this.connection = connection; + } + + @Override + public Channel create() throws Exception { + return connection.createChannel(); + } + + @Override + public PooledObject<Channel> wrap(Channel obj) { + return new DefaultPooledObject<>(obj); + } + } + + @FunctionalInterface + public interface RabbitFunction<T, E extends Throwable> { + T execute(Channel channel) throws E; + } + + @FunctionalInterface + public interface RabbitConsumer<E extends Throwable> { + void execute(Channel channel) throws E; + } + + private final ObjectPool<Channel> pool; + + public RabbitChannelPool(Connection connection) { + pool = new GenericObjectPool<>( + new ChannelBasePooledObjectFactory(connection)); + } + + public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E { + Channel channel = borrowChannel(); + try { + return f.execute(channel); + } finally { + returnChannel(channel); + } + } + + + public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E { + Channel channel = borrowChannel(); + try { + f.execute(channel); + } finally { + returnChannel(channel); + } + } + + private Channel borrowChannel() { + try { + return pool.borrowObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void returnChannel(Channel channel) { + try { + pool.returnObject(channel); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java index d5a945d..ae6b1be 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java @@ -22,11 +22,11 @@ package org.apache.james.queue.rabbitmq; import java.io.IOException; import java.util.Optional; +import org.apache.james.backend.rabbitmq.RabbitChannelPool; import org.apache.james.queue.api.MailQueue; import com.google.common.collect.ImmutableMap; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Channel; import com.rabbitmq.client.GetResponse; class RabbitClient { @@ -39,35 +39,42 @@ class RabbitClient { private static final ImmutableMap<String, Object> NO_ARGUMENTS = ImmutableMap.of(); private static final String ROUTING_KEY = ""; - private final Channel channel; + private final RabbitChannelPool channelPool; - RabbitClient(Channel channel) { - this.channel = channel; + RabbitClient(RabbitChannelPool channelPool) { + this.channelPool = channelPool; } void attemptQueueCreation(MailQueueName name) { - try { - channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct", DURABLE); - channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE, !AUTO_DELETE, NO_ARGUMENTS); - channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(), ROUTING_KEY); - } catch (IOException e) { - throw new RuntimeException(e); - } + channelPool.execute(channel -> { + try { + channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct", DURABLE); + channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE, !AUTO_DELETE, NO_ARGUMENTS); + channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(), ROUTING_KEY); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } void publish(MailQueueName name, byte[] message) throws MailQueue.MailQueueException { - try { - channel.basicPublish(name.toRabbitExchangeName().asString(), ROUTING_KEY, new AMQP.BasicProperties(), message); - } catch (IOException e) { - throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e); - } + channelPool.execute(channel -> { + try { + channel.basicPublish(name.toRabbitExchangeName().asString(), ROUTING_KEY, new AMQP.BasicProperties(), message); + } catch (IOException e) { + throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e); + } + }); } void ack(long deliveryTag) throws IOException { - channel.basicAck(deliveryTag, !MULTIPLE); + RabbitChannelPool.RabbitConsumer<IOException> consumer = channel -> channel.basicAck(deliveryTag, !MULTIPLE); + channelPool.execute(consumer); } Optional<GetResponse> poll(MailQueueName name) throws IOException { - return Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK)); + RabbitChannelPool.RabbitFunction<Optional<GetResponse>, IOException> f = channel -> + Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK)); + return channelPool.execute(f); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index aaf57f7..c2d5c07 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -28,6 +28,7 @@ import javax.mail.internet.MimeMessage; import org.apache.http.client.utils.URIBuilder; import org.apache.james.backend.rabbitmq.DockerRabbitMQ; +import org.apache.james.backend.rabbitmq.RabbitChannelPool; import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.DockerCassandraExtension; @@ -70,7 +71,7 @@ public class RabbitMQMailQueueTest implements MailQueueContract { .setPort(rabbitMQ.getAdminPort()) .build(); - RabbitClient rabbitClient = new RabbitClient(rabbitMQ.connectionFactory().newConnection().createChannel()); + RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQ.connectionFactory().newConnection())); RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); http://git-wip-us.apache.org/repos/asf/james-project/blob/3313b53d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index cf1cb88..bbb4734 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -28,6 +28,7 @@ import javax.mail.internet.MimeMessage; import org.apache.http.client.utils.URIBuilder; import org.apache.james.backend.rabbitmq.DockerRabbitMQ; +import org.apache.james.backend.rabbitmq.RabbitChannelPool; import org.apache.james.backend.rabbitmq.ReusableDockerRabbitMQExtension; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.DockerCassandraExtension; @@ -70,7 +71,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM .setPort(rabbitMQ.getAdminPort()) .build(); - RabbitClient rabbitClient = new RabbitClient(rabbitMQ.connectionFactory().newConnection().createChannel()); + RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQ.connectionFactory().newConnection())); RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
