This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d69b48f1276910d8e3cdf13e677fcafc818b65f7 Author: Benoit Tellier <[email protected]> AuthorDate: Mon Jul 27 16:14:03 2020 +0700 JAMES-3319 Effective blob deletion for RabbitMQMailQueue --- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 2 +- .../apache/james/queue/rabbitmq/MailLoader.java | 5 +- .../james/queue/rabbitmq/MailWithEnqueueId.java | 9 ++- .../queue/rabbitmq/view/api/DeleteCondition.java | 13 +++- .../view/cassandra/CassandraMailQueueView.java | 25 +++++-- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 77 ++++++++++++++++++++++ .../rabbitmq/view/api/DeleteConditionTest.java | 6 +- .../CassandraMailQueueViewTestFactory.java | 4 +- 8 files changed, 123 insertions(+), 18 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index b477335..0241c46 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -132,7 +132,7 @@ class Dequeuer implements Closeable { if (success) { dequeueMetric.increment(); response.ack(); - mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId())); + mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds())); } else { response.nack(REQUEUE); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java index b724e60..ffe765b 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java @@ -47,7 +47,10 @@ class MailLoader { Mono<MailWithEnqueueId> load(MailReferenceDTO dto) { return Mono.fromCallable(() -> dto.toMailReference(blobIdFactory)) .flatMap(mailReference -> buildMail(mailReference) - .map(mail -> new MailWithEnqueueId(mailReference.getEnqueueId(), mail))); + .map(mail -> new MailWithEnqueueId( + mailReference.getEnqueueId(), + mail, + mailReference.getPartsId()))); } private Mono<Mail> buildMail(MailReference mailReference) { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java index f256fbf..a8edc9c 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailWithEnqueueId.java @@ -19,15 +19,18 @@ package org.apache.james.queue.rabbitmq; +import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.mailet.Mail; public class MailWithEnqueueId { private final EnqueueId enqueueId; private final Mail mail; + private final MimeMessagePartsId blobIds; - MailWithEnqueueId(EnqueueId enqueueId, Mail mail) { + MailWithEnqueueId(EnqueueId enqueueId, Mail mail, MimeMessagePartsId blobIds) { this.enqueueId = enqueueId; this.mail = mail; + this.blobIds = blobIds; } public EnqueueId getEnqueueId() { @@ -37,4 +40,8 @@ public class MailWithEnqueueId { public Mail getMail() { return mail; } + + public MimeMessagePartsId getBlobIds() { + return blobIds; + } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java index 12aba7b..d0e8d04 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/DeleteCondition.java @@ -22,6 +22,7 @@ package org.apache.james.queue.rabbitmq.view.api; import java.util.Objects; import org.apache.commons.lang3.NotImplementedException; +import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.EnqueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; @@ -114,15 +115,21 @@ public interface DeleteCondition { class WithEnqueueId implements DeleteCondition { private final EnqueueId enqueueId; + private final MimeMessagePartsId blobIds; - WithEnqueueId(EnqueueId enqueueId) { + WithEnqueueId(EnqueueId enqueueId, MimeMessagePartsId blobIds) { this.enqueueId = enqueueId; + this.blobIds = blobIds; } public EnqueueId getEnqueueId() { return enqueueId; } + public MimeMessagePartsId getBlobIds() { + return blobIds; + } + @Override public boolean shouldBeDeleted(EnqueuedItem enqueuedItem) { Preconditions.checkNotNull(enqueuedItem); @@ -191,9 +198,9 @@ public interface DeleteCondition { return new WithName(value); } - static WithEnqueueId withEnqueueId(EnqueueId value) { + static WithEnqueueId withEnqueueId(EnqueueId value, MimeMessagePartsId blobIds) { Preconditions.checkNotNull(value); - return new WithEnqueueId(value); + return new WithEnqueueId(value, blobIds); } static DeleteCondition all() { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index 72b26a3..4a9bad8 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -24,7 +24,11 @@ import static org.apache.james.util.FunctionalUtils.negate; import java.time.Instant; import javax.inject.Inject; +import javax.mail.internet.MimeMessage; +import org.apache.james.blob.api.Store; +import org.apache.james.blob.mail.MimeMessagePartsId; +import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.EnqueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; @@ -45,40 +49,46 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB private final CassandraMailQueueMailStore storeHelper; private final CassandraMailQueueBrowser cassandraMailQueueBrowser; private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete; + private final MimeMessageStore.Factory mimeMessageStoreFactory; @Inject public Factory(CassandraMailQueueMailStore storeHelper, CassandraMailQueueBrowser cassandraMailQueueBrowser, CassandraMailQueueMailDelete cassandraMailQueueMailDelete, EventsourcingConfigurationManagement eventsourcingConfigurationManagement, + MimeMessageStore.Factory mimeMessageStoreFactory, CassandraMailQueueViewConfiguration configuration) { this.storeHelper = storeHelper; this.cassandraMailQueueBrowser = cassandraMailQueueBrowser; this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete; + this.mimeMessageStoreFactory = mimeMessageStoreFactory; eventsourcingConfigurationManagement.registerConfiguration(configuration); } @Override public MailQueueView create(MailQueueName mailQueueName) { - return new CassandraMailQueueView(storeHelper, mailQueueName, cassandraMailQueueBrowser, cassandraMailQueueMailDelete); + return new CassandraMailQueueView(storeHelper, mailQueueName, cassandraMailQueueBrowser, cassandraMailQueueMailDelete, + mimeMessageStoreFactory.mimeMessageStore()); } } private final CassandraMailQueueMailStore storeHelper; private final CassandraMailQueueBrowser cassandraMailQueueBrowser; private final CassandraMailQueueMailDelete cassandraMailQueueMailDelete; + private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final MailQueueName mailQueueName; CassandraMailQueueView(CassandraMailQueueMailStore storeHelper, MailQueueName mailQueueName, CassandraMailQueueBrowser cassandraMailQueueBrowser, - CassandraMailQueueMailDelete cassandraMailQueueMailDelete) { + CassandraMailQueueMailDelete cassandraMailQueueMailDelete, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore) { this.mailQueueName = mailQueueName; this.storeHelper = storeHelper; this.cassandraMailQueueBrowser = cassandraMailQueueBrowser; this.cassandraMailQueueMailDelete = cassandraMailQueueMailDelete; + this.mimeMessageStore = mimeMessageStore; } @Override @@ -123,7 +133,7 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB public long delete(DeleteCondition deleteCondition) { if (deleteCondition instanceof DeleteCondition.WithEnqueueId) { DeleteCondition.WithEnqueueId enqueueIdCondition = (DeleteCondition.WithEnqueueId) deleteCondition; - delete(enqueueIdCondition.getEnqueueId()).block(); + delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()).block(); return 1L; } return browseThenDelete(deleteCondition); @@ -133,15 +143,18 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB return cassandraMailQueueBrowser.browseReferences(mailQueueName) .map(EnqueuedItemWithSlicingContext::getEnqueuedItem) .filter(deleteCondition::shouldBeDeleted) - .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName)) + .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName) + .then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId())))) .count() .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)) .subscribeOn(Schedulers.elastic()) .block(); } - private Mono<Void> delete(EnqueueId enqueueId) { - return cassandraMailQueueMailDelete.considerDeleted(enqueueId, mailQueueName); + private Mono<Void> delete(EnqueueId enqueueId, + MimeMessagePartsId blobIds) { + return cassandraMailQueueMailDelete.considerDeleted(enqueueId, mailQueueName) + .then(Mono.from(mimeMessageStore.delete(blobIds))); } @Override diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index a8b044d..2da8199 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -19,12 +19,16 @@ package org.apache.james.queue.rabbitmq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static java.time.temporal.ChronoUnit.HOURS; import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty; import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.queue.api.Mails.defaultMail; +import static org.apache.james.queue.api.Mails.defaultMailNoRecipient; +import static org.apache.mailet.base.MailAddressFixture.RECIPIENT1; +import static org.apache.mailet.base.MailAddressFixture.SENDER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.mockito.ArgumentMatchers.any; @@ -48,6 +52,7 @@ import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule import org.apache.james.backends.rabbitmq.RabbitMQExtension; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.cassandra.BlobTables; import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobStoreFactory; import org.apache.james.blob.mail.MimeMessageStore; @@ -175,6 +180,78 @@ class RabbitMQMailQueueTest { } @Test + void dequeueShouldDeleteBlobs(CassandraCluster cassandra) throws Exception { + String name1 = "myMail1"; + Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue()); + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + dequeueFlux.take(1) + .flatMap(mailQueueItem -> Mono.fromCallable(() -> { + mailQueueItem.done(true); + return mailQueueItem; + })).blockLast(Duration.ofSeconds(10)); + + assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME))) + .isEmpty(); + } + + @Test + void clearShouldDeleteBlobs(CassandraCluster cassandra) throws Exception { + String name1 = "myMail1"; + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getManageableMailQueue().clear(); + + assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME))) + .isEmpty(); + } + + @Test + void removeByNameShouldDeleteBlobs(CassandraCluster cassandra) throws Exception { + String name1 = "myMail1"; + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getManageableMailQueue().remove(ManageableMailQueue.Type.Name, name1); + + assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME))) + .isEmpty(); + } + + @Test + void removeByRecipientShouldDeleteBlobs(CassandraCluster cassandra) throws Exception { + String name1 = "myMail1"; + getMailQueue().enQueue(defaultMailNoRecipient() + .name(name1) + .recipient(RECIPIENT1) + .build()); + + getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, RECIPIENT1.asString()); + + assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME))) + .isEmpty(); + } + + @Test + void removeBySenderShouldDeleteBlobs(CassandraCluster cassandra) throws Exception { + String name1 = "myMail1"; + getMailQueue().enQueue(defaultMail() + .name(name1) + .sender(SENDER) + .build()); + + getManageableMailQueue().remove(ManageableMailQueue.Type.Sender, SENDER.asString()); + + assertThat(cassandra.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME))) + .isEmpty(); + } + + @Test void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception { ManageableMailQueue mailQueue = getManageableMailQueue(); int emailCount = 5; diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java index ddcd5da..2b800ed 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/api/DeleteConditionTest.java @@ -80,7 +80,7 @@ class DeleteConditionTest { @Test void withSenderShouldThrowOnNullCondition() { assertThatThrownBy(() -> - DeleteCondition.withEnqueueId(null)) + DeleteCondition.withEnqueueId(null, null)) .isInstanceOf(NullPointerException.class); } @@ -97,7 +97,7 @@ class DeleteConditionTest { .mimeMessagePartsId(MESSAGE_PARTS_ID) .build(); - assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1).shouldBeDeleted(enqueuedItem)) + assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1, MESSAGE_PARTS_ID).shouldBeDeleted(enqueuedItem)) .isTrue(); } @@ -114,7 +114,7 @@ class DeleteConditionTest { .mimeMessagePartsId(MESSAGE_PARTS_ID) .build(); - assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1).shouldBeDeleted(enqueuedItem)) + assertThat(DeleteCondition.withEnqueueId(ENQUEUE_ID_1, MESSAGE_PARTS_ID).shouldBeDeleted(enqueuedItem)) .isFalse(); } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java index 1caf7aa..0d611a8 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java @@ -25,12 +25,9 @@ import java.util.Optional; import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.mail.MimeMessageStore; -import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore; import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao; import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; -import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO; -import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfigurationModule; @@ -66,6 +63,7 @@ public class CassandraMailQueueViewTestFactory { cassandraMailQueueBrowser, cassandraMailQueueMailDelete, eventsourcingConfigurationManagement, + mimeMessageStoreFactory, configuration); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
