JAMES-2541 Concurrent version of Store<T> One will not do what he wants with concurrency when on top of a fixed thread pool... Previous version of the code dead-locked.
Using byte as an intermediate representation I can interact with the object store in an async fashion (and avoid the dead lock) Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b4458677 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b4458677 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b4458677 Branch: refs/heads/master Commit: b44586778a54cd152d3c76fa70c3d3cb1ca2bb44 Parents: 6a29366 Author: Benoit Tellier <[email protected]> Authored: Thu Sep 6 17:31:25 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Mon Sep 10 17:17:41 2018 +0700 ---------------------------------------------------------------------- .../java/org/apache/james/blob/api/Store.java | 22 ++++++++------ .../james/blob/mail/MimeMessageStore.java | 6 ++-- .../james/blob/mail/MimeMessageStoreTest.java | 10 +++--- .../cassandra/CassandraMailRepository.java | 32 +++++++++++--------- 4 files changed, 37 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b4458677/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java index 4665c6b..47c4988 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/Store.java @@ -62,12 +62,12 @@ public interface Store<T> { } interface Decoder<T> { - T decode(Map<BlobType, InputStream> streams); + T decode(Map<BlobType, byte[]> streams); } - Map<BlobType, BlobId> save(T t); + CompletableFuture<Map<BlobType, BlobId>> save(T t); - T read(Map<BlobType, BlobId> blobIds); + CompletableFuture<T> read(Map<BlobType, BlobId> blobIds); class Impl<T> implements Store<T> { private final Encoder<T> encoder; @@ -81,14 +81,14 @@ public interface Store<T> { } @Override - public Map<BlobType, BlobId> save(T t) { + public CompletableFuture<Map<BlobType, BlobId>> save(T t) { return FluentFutureStream.of( encoder.encode(t) .entrySet() .stream() .map(this::saveEntry)) - .join() - .collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue)); + .completableFuture() + .thenApply(pairStream -> pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue))); } private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Map.Entry<BlobType, InputStream> entry) { @@ -97,12 +97,14 @@ public interface Store<T> { } @Override - public T read(Map<BlobType, BlobId> blobIds) { - ImmutableMap<BlobType, InputStream> binaries = blobIds.entrySet() + public CompletableFuture<T> read(Map<BlobType, BlobId> blobIds) { + CompletableFuture<ImmutableMap<BlobType, byte[]>> binaries = FluentFutureStream.of(blobIds.entrySet() .stream() - .map(entry -> Pair.of(entry.getKey(), blobStore.read(entry.getValue()))) + .map(entry -> blobStore.readBytes(entry.getValue()) + .thenApply(bytes -> Pair.of(entry.getKey(), bytes)))) .collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue)); - return decoder.decode(binaries); + + return binaries.thenApply(decoder::decode); } } } http://git-wip-us.apache.org/repos/asf/james-project/blob/b4458677/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java ---------------------------------------------------------------------- diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java index 75cd973..5c2e444 100644 --- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java +++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessageStore.java @@ -106,15 +106,15 @@ public class MimeMessageStore extends Store.Impl<MimeMessage> { static class MailDecoder implements Decoder<MimeMessage> { @Override - public MimeMessage decode(Map<BlobType, InputStream> streams) { + public MimeMessage decode(Map<BlobType, byte[]> streams) { Preconditions.checkNotNull(streams); Preconditions.checkArgument(streams.containsKey(HEADER_BLOB_TYPE)); Preconditions.checkArgument(streams.containsKey(BODY_BLOB_TYPE)); return toMimeMessage( new SequenceInputStream( - streams.get(HEADER_BLOB_TYPE), - streams.get(BODY_BLOB_TYPE))); + new ByteArrayInputStream(streams.get(HEADER_BLOB_TYPE)), + new ByteArrayInputStream(streams.get(BODY_BLOB_TYPE)))); } private MimeMessage toMimeMessage(InputStream inputStream) { http://git-wip-us.apache.org/repos/asf/james-project/blob/b4458677/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java index 3bc3440..5a896ea 100644 --- a/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java +++ b/server/blob/mail-store/src/test/java/org/apache/james/blob/mail/MimeMessageStoreTest.java @@ -70,9 +70,9 @@ class MimeMessageStoreTest { .setText("Important mail content") .build(); - Map<Store.BlobType, BlobId> parts = testee.save(message); + Map<Store.BlobType, BlobId> parts = testee.save(message).join(); - MimeMessage retrievedMessage = testee.read(parts); + MimeMessage retrievedMessage = testee.read(parts).join(); assertThat(MimeMessageUtil.asString(retrievedMessage)) .isEqualTo(MimeMessageUtil.asString(message)); @@ -86,9 +86,9 @@ class MimeMessageStoreTest { .setSubject("Important Mail") .build(); - Map<Store.BlobType, BlobId> parts = testee.save(message); + Map<Store.BlobType, BlobId> parts = testee.save(message).join(); - MimeMessage retrievedMessage = testee.read(parts); + MimeMessage retrievedMessage = testee.read(parts).join(); assertThat(MimeMessageUtil.asString(retrievedMessage)) .isEqualTo(MimeMessageUtil.asString(message)); @@ -105,7 +105,7 @@ class MimeMessageStoreTest { .setText("Important mail content") .build(); - Map<Store.BlobType, BlobId> parts = testee.save(message); + Map<Store.BlobType, BlobId> parts = testee.save(message).join(); SoftAssertions.assertSoftly( softly -> { http://git-wip-us.apache.org/repos/asf/james-project/blob/b4458677/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index d2c8735..d0b4cbe 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -26,7 +26,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import javax.mail.MessagingException; -import javax.mail.internet.MimeMessage; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.Store; @@ -34,9 +33,11 @@ import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepository; import org.apache.james.mailrepository.api.MailRepositoryUrl; +import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.FluentFutureStream; import org.apache.mailet.Mail; +import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableMap; public class CassandraMailRepository implements MailRepository { @@ -61,10 +62,10 @@ public class CassandraMailRepository implements MailRepository { public MailKey store(Mail mail) throws MessagingException { MailKey mailKey = MailKey.forMail(mail); - - Map<Store.BlobType, BlobId> parts = mimeMessageStore.save(mail.getMessage()); - - mailDAO.store(url, mail, parts.get(MimeMessageStore.HEADER_BLOB_TYPE), parts.get(MimeMessageStore.BODY_BLOB_TYPE)) + mimeMessageStore.save(mail.getMessage()) + .thenCompose(Throwing.function(parts -> mailDAO.store(url, mail, + parts.get(MimeMessageStore.HEADER_BLOB_TYPE), + parts.get(MimeMessageStore.BODY_BLOB_TYPE)))) .thenCompose(any -> keysDAO.store(url, mailKey)) .thenCompose(this::increaseSizeIfStored) .join(); @@ -88,21 +89,22 @@ public class CassandraMailRepository implements MailRepository { @Override public Mail retrieve(MailKey key) { - return mailDAO.read(url, key) - .thenApply(optional -> optional.map(this::toMail)) + return CompletableFutureUtil + .unwrap(mailDAO.read(url, key) + .thenApply(optional -> optional.map(this::toMail))) .join() .orElse(null); } - private Mail toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { - MimeMessage mimeMessage = mimeMessageStore - .read(ImmutableMap.of( - MimeMessageStore.HEADER_BLOB_TYPE, mailDTO.getHeaderBlobId(), - MimeMessageStore.BODY_BLOB_TYPE, mailDTO.getBodyBlobId())); + private CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { + Map<Store.BlobType, BlobId> parts = ImmutableMap.of( + MimeMessageStore.HEADER_BLOB_TYPE, mailDTO.getHeaderBlobId(), + MimeMessageStore.BODY_BLOB_TYPE, mailDTO.getBodyBlobId()); - return mailDTO.getMailBuilder() - .mimeMessage(mimeMessage) - .build(); + return mimeMessageStore.read(parts) + .thenApply(mimeMessage -> mailDTO.getMailBuilder() + .mimeMessage(mimeMessage) + .build()); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
