JAMES-2541 Implement the MailQueue API on top of RabbitMQ Handle all fields of a mail
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/197ea1ed Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/197ea1ed Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/197ea1ed Branch: refs/heads/master Commit: 197ea1ed45738b6e04a09187997485051ae4d749 Parents: 5f68dd5 Author: Benoit Tellier <[email protected]> Authored: Wed Sep 5 16:13:43 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Mon Sep 10 17:19:38 2018 +0700 ---------------------------------------------------------------------- server/queue/queue-rabbitmq/pom.xml | 24 +++++ .../apache/james/queue/rabbitmq/MailDTO.java | 104 ++++++++++++++++++- .../james/queue/rabbitmq/RabbitClient.java | 13 ++- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 96 +++++++++++++++-- .../rabbitmq/RabbitMQMailQueueFactory.java | 18 ++-- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 94 +++++++---------- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 45 +++++++- 7 files changed, 306 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml index 2339392..1dc37ef 100644 --- a/server/queue/queue-rabbitmq/pom.xml +++ b/server/queue/queue-rabbitmq/pom.xml @@ -39,10 +39,34 @@ <dependencies> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>apache-james-backends-cassandra</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>apache-james-backends-cassandra</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-cassandra</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-core</artifactId> </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-mail-store</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-queue-api</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java index 3022154..76ed232 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailDTO.java @@ -19,49 +19,147 @@ package org.apache.james.queue.rabbitmq; +import java.time.Instant; import java.util.Collection; +import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.core.MailAddress; +import org.apache.james.util.SerializationUtil; +import org.apache.james.util.streams.Iterators; import org.apache.mailet.Mail; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; class MailDTO { - static MailDTO fromMail(Mail mail) { + static MailDTO fromMail(Mail mail, MimeMessagePartsId partsId) { return new MailDTO( mail.getRecipients().stream() .map(MailAddress::asString) .collect(Guavate.toImmutableList()), mail.getName(), - mail.getSender().asString()); + mail.getSender().asString(), + mail.getState(), + mail.getErrorMessage(), + mail.getLastUpdated().toInstant(), + serializedAttributes(mail), + mail.getRemoteAddr(), + mail.getRemoteHost(), + SerializationUtil.serialize(mail.getPerRecipientSpecificHeaders()), + partsId.getHeaderBlobId().asString(), + partsId.getBodyBlobId().asString()); + } + + private static ImmutableMap<String, String> serializedAttributes(Mail mail) { + return Iterators.toStream(mail.getAttributeNames()) + .collect(Guavate.toImmutableMap( + name -> name, + name -> SerializationUtil.serialize(mail.getAttribute(name)))); } private final ImmutableList<String> recipients; private final String name; private final String sender; + private final String state; + private final String errorMessage; + private final Instant lastUpdated; + private final ImmutableMap<String, String> attributes; + private final String remoteAddr; + private final String remoteHost; + private final String perRecipientHeaders; + private final String headerBlobId; + private final String bodyBlobId; @JsonCreator private MailDTO(@JsonProperty("recipients") ImmutableList<String> recipients, @JsonProperty("name") String name, - @JsonProperty("sender") String sender) { + @JsonProperty("sender") String sender, + @JsonProperty("state") String state, + @JsonProperty("errorMessage") String errorMessage, + @JsonProperty("lastUpdated") Instant lastUpdated, + @JsonProperty("attributes") ImmutableMap<String, String> attributes, + @JsonProperty("remoteAddr") String remoteAddr, + @JsonProperty("remoteHost") String remoteHost, + @JsonProperty("perRecipientHeaders") String perRecipientHeaders, + @JsonProperty("headerBlobId") String headerBlobId, + @JsonProperty("bodyBlobId") String bodyBlobId) { this.recipients = recipients; this.name = name; this.sender = sender; + this.state = state; + this.errorMessage = errorMessage; + this.lastUpdated = lastUpdated; + this.attributes = attributes; + this.remoteAddr = remoteAddr; + this.remoteHost = remoteHost; + this.perRecipientHeaders = perRecipientHeaders; + this.headerBlobId = headerBlobId; + this.bodyBlobId = bodyBlobId; } + @JsonProperty("recipients") Collection<String> getRecipients() { return recipients; } + @JsonProperty("name") String getName() { return name; } + @JsonProperty("sender") String getSender() { return sender; } + + @JsonProperty("state") + String getState() { + return state; + } + + @JsonProperty("errorMessage") + String getErrorMessage() { + return errorMessage; + } + + @JsonProperty("lastUpdated") + Instant getLastUpdated() { + return lastUpdated; + } + + @JsonProperty("attributes") + Map<String, String> getAttributes() { + return attributes; + } + + @JsonProperty("remoteAddr") + String getRemoteAddr() { + return remoteAddr; + } + + @JsonProperty("remoteHost") + String getRemoteHost() { + return remoteHost; + } + + @JsonProperty("perRecipientHeaders") + String getPerRecipientHeaders() { + return perRecipientHeaders; + } + + @JsonProperty("headerBlobId") + String getHeaderBlobId() { + return headerBlobId; + } + + @JsonProperty("bodyBlobId") + String getBodyBlobId() { + return bodyBlobId; + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java index 7439956..d5a945d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitClient.java @@ -45,20 +45,19 @@ class RabbitClient { this.channel = channel; } - RabbitMQMailQueue attemptQueueCreation(MailQueueName name) { + void attemptQueueCreation(MailQueueName name) { try { - channel.exchangeDeclare(name.toRabbitExchangeName(), "direct", DURABLE); - channel.queueDeclare(name.toRabbitWorkQueueName(), DURABLE, !EXCLUSIVE, !AUTO_DELETE, NO_ARGUMENTS); - channel.queueBind(name.toRabbitWorkQueueName(), name.toRabbitExchangeName(), ROUTING_KEY); + channel.exchangeDeclare(name.toRabbitExchangeName().asString(), "direct", DURABLE); + channel.queueDeclare(name.toWorkQueueName().asString(), DURABLE, !EXCLUSIVE, !AUTO_DELETE, NO_ARGUMENTS); + channel.queueBind(name.toWorkQueueName().asString(), name.toRabbitExchangeName().asString(), ROUTING_KEY); } catch (IOException e) { throw new RuntimeException(e); } - return new RabbitMQMailQueue(name, this); } void publish(MailQueueName name, byte[] message) throws MailQueue.MailQueueException { try { - channel.basicPublish(name.toRabbitExchangeName(), ROUTING_KEY, new AMQP.BasicProperties(), message); + channel.basicPublish(name.toRabbitExchangeName().asString(), ROUTING_KEY, new AMQP.BasicProperties(), message); } catch (IOException e) { throw new MailQueue.MailQueueException("Unable to publish to RabbitMQ", e); } @@ -69,6 +68,6 @@ class RabbitClient { } Optional<GetResponse> poll(MailQueueName name) throws IOException { - return Optional.ofNullable(channel.basicGet(name.toRabbitWorkQueueName(), !AUTO_ACK)); + return Optional.ofNullable(channel.basicGet(name.toWorkQueueName().asString(), !AUTO_ACK)); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 68a3d67..c40fe6b 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -20,13 +20,28 @@ package org.apache.james.queue.rabbitmq; import java.io.IOException; +import java.io.Serializable; +import java.util.Date; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import javax.inject.Inject; +import javax.mail.MessagingException; +import javax.mail.internet.AddressException; +import javax.mail.internet.MimeMessage; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.Store; +import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.core.MailAddress; import org.apache.james.queue.api.MailQueue; import org.apache.james.server.core.MailImpl; +import org.apache.james.util.SerializationUtil; import org.apache.mailet.Mail; +import org.apache.mailet.PerRecipientHeaders; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,10 +52,12 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; +import com.google.common.annotations.VisibleForTesting; import com.nurkiewicz.asyncretry.AsyncRetryExecutor; import com.rabbitmq.client.GetResponse; public class RabbitMQMailQueue implements MailQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQMailQueue.class); private static class NoMailYetException extends RuntimeException { @@ -72,13 +89,34 @@ public class RabbitMQMailQueue implements MailQueue { } } + static class Factory { + private final RabbitClient rabbitClient; + private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; + private final BlobId.Factory blobIdFactory; + + @Inject + @VisibleForTesting Factory(RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) { + this.rabbitClient = rabbitClient; + this.mimeMessageStore = mimeMessageStore; + this.blobIdFactory = blobIdFactory; + } + + RabbitMQMailQueue create(MailQueueName mailQueueName) { + return new RabbitMQMailQueue(mailQueueName, rabbitClient, mimeMessageStore, blobIdFactory); + } + } + private static final int TEN_MS = 10; private final MailQueueName name; private final RabbitClient rabbitClient; + private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; + private final BlobId.Factory blobIdFactory; private final ObjectMapper objectMapper; - RabbitMQMailQueue(MailQueueName name, RabbitClient rabbitClient) { + RabbitMQMailQueue(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory) { + this.mimeMessageStore = mimeMessageStore; + this.blobIdFactory = blobIdFactory; this.name = name; this.rabbitClient = rabbitClient; this.objectMapper = new ObjectMapper() @@ -102,11 +140,20 @@ public class RabbitMQMailQueue implements MailQueue { @Override public void enQueue(Mail mail) throws MailQueueException { - MailDTO mailDTO = MailDTO.fromMail(mail); + MimeMessagePartsId partsId = saveBlobs(mail).join(); + MailDTO mailDTO = MailDTO.fromMail(mail, partsId); byte[] message = getMessageBytes(mailDTO); rabbitClient.publish(name, message); } + private CompletableFuture<MimeMessagePartsId> saveBlobs(Mail mail) throws MailQueueException { + try { + return mimeMessageStore.save(mail.getMessage()); + } catch (MessagingException e) { + throw new MailQueueException("Error while saving blob", e); + } + } + private byte[] getMessageBytes(MailDTO mailDTO) throws MailQueueException { try { return objectMapper.writeValueAsBytes(mailDTO); @@ -115,7 +162,6 @@ public class RabbitMQMailQueue implements MailQueue { } } - @Override public MailQueueItem deQueue() throws MailQueueException { GetResponse getResponse = pollChannel(); @@ -147,13 +193,41 @@ public class RabbitMQMailQueue implements MailQueue { .orElseThrow(NoMailYetException::new); } - private Mail toMail(MailDTO dto) { - return new MailImpl( - dto.getName(), - MailAddress.getMailSender(dto.getSender()), - dto.getRecipients() - .stream() - .map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow()) - .collect(Guavate.toImmutableList())); + private Mail toMail(MailDTO dto) throws MailQueueException { + try { + MimeMessage mimeMessage = mimeMessageStore.read( + MimeMessagePartsId.builder() + .headerBlobId(blobIdFactory.from(dto.getHeaderBlobId())) + .bodyBlobId(blobIdFactory.from(dto.getBodyBlobId())) + .build()) + .join(); + + MailImpl mail = new MailImpl( + dto.getName(), + MailAddress.getMailSender(dto.getSender()), + dto.getRecipients() + .stream() + .map(Throwing.<String, MailAddress>function(MailAddress::new).sneakyThrow()) + .collect(Guavate.toImmutableList()), + mimeMessage); + + mail.setErrorMessage(dto.getErrorMessage()); + mail.setRemoteAddr(dto.getRemoteAddr()); + mail.setRemoteHost(dto.getRemoteHost()); + mail.setState(dto.getState()); + mail.setLastUpdated(new Date(dto.getLastUpdated().toEpochMilli())); + + dto.getAttributes() + .forEach((name, value) -> mail.setAttribute(name, SerializationUtil.<Serializable>deserialize(value))); + + Optional.ofNullable(SerializationUtil.<PerRecipientHeaders>deserialize(dto.getPerRecipientHeaders())) + .ifPresent(mail::addAllSpecificHeaderForRecipient); + + return mail; + } catch (AddressException e) { + throw new MailQueueException("Failed to parse mail address", e); + } catch (MessagingException e) { + throw new MailQueueException("Failed to generate mime message", e); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java index 1e541aa..784cf50 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java @@ -29,17 +29,18 @@ import org.apache.james.queue.api.MailQueueFactory; import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; -import com.rabbitmq.client.Connection; public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQueue> { private final RabbitClient rabbitClient; private final RabbitMQManagementApi mqManagementApi; + private final RabbitMQMailQueue.Factory mailQueueFactory; @VisibleForTesting @Inject - RabbitMQMailQueueFactory(Connection connection, RabbitMQManagementApi mqManagementApi) throws IOException { - this.rabbitClient = new RabbitClient(connection.createChannel()); + RabbitMQMailQueueFactory(RabbitClient rabbitClient, RabbitMQManagementApi mqManagementApi, RabbitMQMailQueue.Factory mailQueueFactory) throws IOException { + this.rabbitClient = rabbitClient; this.mqManagementApi = mqManagementApi; + this.mailQueueFactory = mailQueueFactory; } @Override @@ -51,20 +52,25 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu public RabbitMQMailQueue createQueue(String name) { MailQueueName mailQueueName = MailQueueName.fromString(name); return getQueue(mailQueueName) - .orElseGet(() -> rabbitClient.attemptQueueCreation(mailQueueName)); + .orElseGet(() -> attemptQueueCreation(mailQueueName)); } @Override public Set<RabbitMQMailQueue> listCreatedMailQueues() { return mqManagementApi.listCreatedMailQueueNames() - .map(name -> new RabbitMQMailQueue(name, rabbitClient)) + .map(mailQueueFactory::create) .collect(Guavate.toImmutableSet()); } + private RabbitMQMailQueue attemptQueueCreation(MailQueueName mailQueueName) { + rabbitClient.attemptQueueCreation(mailQueueName); + return mailQueueFactory.create(mailQueueName); + } + private Optional<RabbitMQMailQueue> getQueue(MailQueueName name) { return mqManagementApi.listCreatedMailQueueNames() .filter(name::equals) - .map(queueName -> new RabbitMQMailQueue(queueName, rabbitClient)) + .map(mailQueueFactory::create) .findFirst(); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 9573ab8..a75f761 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -24,87 +24,67 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.TimeoutException; +import javax.mail.internet.MimeMessage; + import org.apache.http.client.utils.URIBuilder; +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.DockerCassandraExtension; +import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.api.Store; +import org.apache.james.blob.cassandra.CassandraBlobModule; +import org.apache.james.blob.cassandra.CassandraBlobsDAO; +import org.apache.james.blob.mail.MimeMessagePartsId; +import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueContract; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(DockerRabbitMQExtension.class) +@ExtendWith({DockerRabbitMQExtension.class, DockerCassandraExtension.class}) public class RabbitMQMailQueueTest implements MailQueueContract { + private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + + private static CassandraCluster cassandra; private RabbitMQMailQueueFactory mailQueueFactory; + @BeforeAll + static void setUpClass(DockerCassandraExtension.DockerCassandra dockerCassandra) { + cassandra = CassandraCluster.create(CassandraBlobModule.MODULE, dockerCassandra.getHost()); + } + @BeforeEach void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException { + CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY); + Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore(); URI rabbitManagementUri = new URIBuilder() .setScheme("http") .setHost(rabbitMQ.getHostIp()) .setPort(rabbitMQ.getAdminPort()) .build(); - mailQueueFactory = new RabbitMQMailQueueFactory( - rabbitMQ.connectionFactory().newConnection(), - new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray()))); - } - - @Override - public MailQueue getMailQueue() { - return mailQueueFactory.createQueue("spool"); - } - - @Disabled - @Override - public void queueShouldPreserveMimeMessage() { - - } - - @Disabled - @Override - public void queueShouldPreserveMailAttribute() { - - } - - @Disabled - @Override - public void queueShouldPreserveErrorMessage() { - - } - - @Disabled - @Override - public void queueShouldPreserveState() { - - } - - @Disabled - @Override - public void queueShouldPreserveRemoteAddress() { - - } - - @Disabled - @Override - public void queueShouldPreserveRemoteHost() { - + RabbitClient rabbitClient = new RabbitClient(rabbitMQ.connectionFactory().newConnection().createChannel()); + RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); + RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); + mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); } - @Disabled - @Override - public void queueShouldPreserveLastUpdated() { - + @AfterEach + void tearDown() { + cassandra.clearTables(); } - @Disabled - @Override - public void queueShouldPreservePerRecipientHeaders() { - + @AfterAll + static void tearDownClass() { + cassandra.closeCluster(); } - @Disabled @Override - public void queueShouldPreserveNonStringMailAttribute() { - + public MailQueue getMailQueue() { + return mailQueueFactory.createQueue("spool"); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/197ea1ed/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index 028d125..60b3e10 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -24,27 +24,64 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.TimeoutException; +import javax.mail.internet.MimeMessage; + import org.apache.http.client.utils.URIBuilder; +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.DockerCassandraExtension; +import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.api.Store; +import org.apache.james.blob.cassandra.CassandraBlobModule; +import org.apache.james.blob.cassandra.CassandraBlobsDAO; +import org.apache.james.blob.mail.MimeMessagePartsId; +import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.api.MailQueueFactoryContract; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith(DockerRabbitMQExtension.class) +@ExtendWith({DockerRabbitMQExtension.class, DockerCassandraExtension.class}) class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQMailQueue> { + private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + + private static CassandraCluster cassandra; private RabbitMQMailQueueFactory mailQueueFactory; + @BeforeAll + static void setUpClass(DockerCassandraExtension.DockerCassandra dockerCassandra) { + cassandra = CassandraCluster.create(CassandraBlobModule.MODULE, dockerCassandra.getHost()); + } + @BeforeEach void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException { + CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf(), CassandraConfiguration.DEFAULT_CONFIGURATION, BLOB_ID_FACTORY); + Store<MimeMessage, MimeMessagePartsId> mimeMessageStore = MimeMessageStore.factory(blobsDAO).mimeMessageStore(); + URI rabbitManagementUri = new URIBuilder() .setScheme("http") .setHost(rabbitMQ.getHostIp()) .setPort(rabbitMQ.getAdminPort()) .build(); - mailQueueFactory = new RabbitMQMailQueueFactory( - rabbitMQ.connectionFactory().newConnection(), - new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray()))); + + RabbitClient rabbitClient = new RabbitClient(rabbitMQ.connectionFactory().newConnection().createChannel()); + RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(rabbitClient, mimeMessageStore, BLOB_ID_FACTORY); + RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); + mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); + } + + @AfterEach + void tearDown() { + cassandra.clearTables(); + } + + @AfterAll + static void tearDownClass() { + cassandra.closeCluster(); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
