This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f9a847739145a3baf11ba7aced4ba06157561295 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Jul 24 16:52:27 2020 +0700 JAMES-3319 Actual Message & Attachment deletion for mailbox/cassandra --- .../CassandraMailboxSessionMapperFactory.java | 2 +- .../mailbox/cassandra/DeleteMessageListener.java | 20 ++++++++++-- .../cassandra/mail/CassandraMessageDAO.java | 38 ++++++++++++---------- .../cassandra/mail/MessageRepresentation.java | 15 ++++++++- .../cassandra/CassandraMailboxManagerTest.java | 24 ++++++++++++++ 5 files changed, 77 insertions(+), 22 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java index def8f71..e51364c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java @@ -206,6 +206,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa public DeleteMessageListener deleteMessageListener() { return new DeleteMessageListener(imapUidDAO, messageIdDAO, messageDAO, attachmentDAOV2, ownerDAO, attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO, - mailboxCounterDAO, mailboxRecentsDAO); + mailboxCounterDAO, mailboxRecentsDAO, blobStore); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java index e7e3627..d4a3ea7 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java @@ -26,6 +26,7 @@ import java.util.function.Predicate; import javax.inject.Inject; +import org.apache.james.blob.api.BlobStore; import org.apache.james.mailbox.acl.ACLDiff; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; @@ -88,13 +89,14 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen private final CassandraDeletedMessageDAO deletedMessageDAO; private final CassandraMailboxCounterDAO counterDAO; private final CassandraMailboxRecentsDAO recentsDAO; + private final BlobStore blobStore; @Inject public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO, CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO, CassandraAttachmentMessageIdDAO attachmentMessageIdDAO, CassandraACLMapper aclMapper, CassandraUserMailboxRightsDAO rightsDAO, CassandraApplicableFlagDAO applicableFlagDAO, - CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO) { + CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore) { this.imapUidDAO = imapUidDAO; this.messageIdDAO = messageIdDAO; this.messageDAO = messageDAO; @@ -108,6 +110,7 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen this.deletedMessageDAO = deletedMessageDAO; this.counterDAO = counterDAO; this.recentsDAO = recentsDAO; + this.blobStore = blobStore; } @Override @@ -172,6 +175,7 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen .filterWhen(this::isReferenced) .flatMap(id -> readMessage(id) .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message)) + .flatMap(this::deleteMessageBlobs) .flatMap(this::deleteAttachmentMessageIds) .then(messageDAO.delete(messageId))); } @@ -181,10 +185,19 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen .filterWhen(id -> isReferenced(id, excludedId)) .flatMap(id -> readMessage(id) .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message)) + .flatMap(this::deleteMessageBlobs) .flatMap(this::deleteAttachmentMessageIds) .then(messageDAO.delete(messageId))); } + private Mono<MessageRepresentation> deleteMessageBlobs(MessageRepresentation message) { + return Flux.merge( + blobStore.delete(blobStore.getDefaultBucketName(), message.getHeaderId()), + blobStore.delete(blobStore.getDefaultBucketName(), message.getBodyId())) + .then() + .thenReturn(message); + } + private Mono<MessageRepresentation> readMessage(CassandraMessageId id) { return messageDAO.retrieveMessage(id, MessageMapper.FetchType.Metadata); } @@ -193,7 +206,10 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen return Flux.fromIterable(message.getAttachments()) .filterWhen(attachment -> ownerDAO.retrieveOwners(attachment.getAttachmentId()).hasElements().map(negate())) .filterWhen(attachment -> hasOtherMessagesReferences(message, attachment)) - .concatMap(attachment -> attachmentDAO.delete(attachment.getAttachmentId())) + .concatMap(attachment -> attachmentDAO.getAttachment(attachment.getAttachmentId()) + .map(CassandraAttachmentDAOV2.DAOAttachment::getBlobId) + .flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId))) + .then(attachmentDAO.delete(attachment.getAttachmentId()))) .then(); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index a61d01f..7e7f06b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -245,7 +245,11 @@ public class CassandraMessageDAO { } Row row = rows.one(); - return buildContentRetriever(fetchType, row).map(content -> + BlobId headerId = retrieveBlobId(HEADER_CONTENT, row); + BlobId bodyId = retrieveBlobId(BODY_CONTENT, row); + int bodyStartOctet = row.getInt(BODY_START_OCTET); + + return buildContentRetriever(fetchType, headerId, bodyId, bodyStartOctet).map(content -> new MessageRepresentation( cassandraMessageId, row.getTimestamp(INTERNAL_DATE), @@ -253,7 +257,9 @@ public class CassandraMessageDAO { row.getInt(BODY_START_OCTET), new SharedByteArrayInputStream(content), getPropertyBuilder(row), - getAttachments(row).collect(Guavate.toImmutableList()))); + getAttachments(row).collect(Guavate.toImmutableList()), + headerId, + bodyId)); } private PropertyBuilder getPropertyBuilder(Row row) { @@ -293,15 +299,15 @@ public class CassandraMessageDAO { .setUUID(MESSAGE_ID, messageId.get())); } - private Mono<byte[]> buildContentRetriever(FetchType fetchType, Row row) { + private Mono<byte[]> buildContentRetriever(FetchType fetchType, BlobId headerId, BlobId bodyId, int bodyStartOctet) { switch (fetchType) { case Full: - return getFullContent(row); + return getFullContent(headerId, bodyId); case Headers: - return getHeaderContent(row); + return getContent(headerId); case Body: - return getBodyContent(row) - .map(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data)); + return getContent(bodyId) + .map(data -> Bytes.concat(new byte[bodyStartOctet], data)); case Metadata: return Mono.just(EMPTY_BYTE_ARRAY); default: @@ -309,20 +315,16 @@ public class CassandraMessageDAO { } } - private Mono<byte[]> getFullContent(Row row) { - return getHeaderContent(row) - .zipWith(getBodyContent(row), Bytes::concat); - } - - private Mono<byte[]> getBodyContent(Row row) { - return getFieldContent(BODY_CONTENT, row); + private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) { + return getContent(headerId) + .zipWith(getContent(bodyId), Bytes::concat); } - private Mono<byte[]> getHeaderContent(Row row) { - return getFieldContent(HEADER_CONTENT, row); + private Mono<byte[]> getContent(BlobId blobId) { + return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId)); } - private Mono<byte[]> getFieldContent(String field, Row row) { - return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobIdFactory.from(row.getString(field)))); + private BlobId retrieveBlobId(String field, Row row) { + return blobIdFactory.from(row.getString(field)); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java index 7a9a0b1..7ac496b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/MessageRepresentation.java @@ -24,6 +24,7 @@ import java.util.List; import javax.mail.util.SharedByteArrayInputStream; +import org.apache.james.blob.api.BlobId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.MessageAttachmentMetadata; import org.apache.james.mailbox.model.MessageId; @@ -38,9 +39,11 @@ public class MessageRepresentation { private final SharedByteArrayInputStream content; private final PropertyBuilder propertyBuilder; private final List<MessageAttachmentRepresentation> attachments; + private final BlobId headerId; + private final BlobId bodyId; public MessageRepresentation(MessageId messageId, Date internalDate, Long size, Integer bodySize, SharedByteArrayInputStream content, - PropertyBuilder propertyBuilder, List<MessageAttachmentRepresentation> attachments) { + PropertyBuilder propertyBuilder, List<MessageAttachmentRepresentation> attachments, BlobId headerId, BlobId bodyId) { this.messageId = messageId; this.internalDate = internalDate; this.size = size; @@ -48,6 +51,8 @@ public class MessageRepresentation { this.content = content; this.propertyBuilder = propertyBuilder; this.attachments = attachments; + this.headerId = headerId; + this.bodyId = bodyId; } public SimpleMailboxMessage toMailboxMessage(ComposedMessageIdWithMetaData metadata, List<MessageAttachmentMetadata> attachments) { @@ -81,4 +86,12 @@ public class MessageRepresentation { public List<MessageAttachmentRepresentation> getAttachments() { return attachments; } + + public BlobId getHeaderId() { + return headerId; + } + + public BlobId getBodyId() { + return bodyId; + } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java index 328fcc6..f2a98f2 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java @@ -18,6 +18,7 @@ ****************************************************************/ package org.apache.james.mailbox.cassandra; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -34,6 +35,7 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.cassandra.BlobTables; import org.apache.james.core.Username; import org.apache.james.mailbox.MailboxManagerTest; import org.apache.james.mailbox.MailboxSession; @@ -160,6 +162,28 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai } @Test + void deleteMailboxShouldDeleteMessageAndAttachmentBlobs(CassandraCluster cassandraCluster) throws Exception { + inboxManager.appendMessage(MessageManager.AppendCommand.builder() + .build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session); + + mailboxManager.deleteMailbox(inbox, session); + + assertThat(cassandraCluster.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME))) + .isEmpty(); + } + + @Test + void deleteMessageShouldDeleteMessageAndAttachmentBlobs(CassandraCluster cassandraCluster) throws Exception { + AppendResult appendResult = inboxManager.appendMessage(MessageManager.AppendCommand.builder() + .build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session); + + inboxManager.delete(ImmutableList.of(appendResult.getId().getUid()), session); + + assertThat(cassandraCluster.getConf().execute(select().from(BlobTables.DefaultBucketBlobTable.TABLE_NAME))) + .isEmpty(); + } + + @Test void deleteMailboxShouldEventuallyUnreferenceMessageMetadataWhenDeleteAttachmentFails(CassandraCluster cassandraCluster) throws Exception { AppendResult appendResult = inboxManager.appendMessage(MessageManager.AppendCommand.builder() .build(ClassLoaderUtils.getSystemResourceAsByteArray("eml/emailWithOnlyAttachment.eml")), session); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
