JAMES-2082 Add CassandraMessageDAOV2
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3b621a3d Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3b621a3d Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3b621a3d Branch: refs/heads/master Commit: 3b621a3d6988e3cbef9affe9572297d84584d0b7 Parents: 3eca947 Author: benwa <btell...@linagora.com> Authored: Thu Jul 6 17:16:19 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:23:55 2017 +0200 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageDAOV2.java | 367 +++++++++++++++++++ .../mail/CassandraMessageDAOV2Test.java | 183 +++++++++ 2 files changed, 550 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/3b621a3d/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java new file mode 100644 index 0000000..170d80d --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2.java @@ -0,0 +1,367 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.ATTACHMENTS; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_CONTENT; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_OCTECTS; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BODY_START_OCTET; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FIELDS; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.FULL_CONTENT_OCTETS; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADERS; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.HEADER_CONTENT; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.INTERNAL_DATE; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.METADATA; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.PROPERTIES; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TABLE_NAME; +import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.TEXTUAL_LINE_COUNT; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.inject.Inject; +import javax.mail.util.SharedByteArrayInputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.init.CassandraTypesProvider; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.mailbox.cassandra.BlobId; +import org.apache.james.mailbox.cassandra.CassandraMessageId; +import org.apache.james.mailbox.cassandra.Limit; +import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Attachments; +import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Properties; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.AttachmentId; +import org.apache.james.mailbox.model.Cid; +import org.apache.james.mailbox.model.ComposedMessageId; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.MessageAttachment; +import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; +import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; +import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty; +import org.apache.james.util.CompletableFutureUtil; +import org.apache.james.util.FluentFutureStream; +import org.apache.james.util.streams.JamesCollectors; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.github.steveash.guavate.Guavate; +import com.google.common.primitives.Bytes; + +public class CassandraMessageDAOV2 { + public static final int CHUNK_SIZE_ON_READ = 100; + public static final long DEFAULT_LONG_VALUE = 0L; + public static final String DEFAULT_OBJECT_VALUE = null; + private static final byte[] EMPTY_BYTE_ARRAY = {}; + + private final CassandraAsyncExecutor cassandraAsyncExecutor; + private final CassandraTypesProvider typesProvider; + private final CassandraBlobsDAO blobsDAO; + private final PreparedStatement insert; + private final PreparedStatement delete; + private final PreparedStatement selectMetadata; + private final PreparedStatement selectHeaders; + private final PreparedStatement selectFields; + private final PreparedStatement selectBody; + + @Inject + public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) { + this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + this.typesProvider = typesProvider; + this.blobsDAO = blobsDAO; + this.insert = prepareInsert(session); + this.delete = prepareDelete(session); + this.selectMetadata = prepareSelect(session, METADATA); + this.selectHeaders = prepareSelect(session, HEADERS); + this.selectFields = prepareSelect(session, FIELDS); + this.selectBody = prepareSelect(session, BODY); + } + + private PreparedStatement prepareSelect(Session session, String[] fields) { + return session.prepare(select(fields) + .from(TABLE_NAME) + .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID)))); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(TABLE_NAME) + .value(MESSAGE_ID, bindMarker(MESSAGE_ID)) + .value(INTERNAL_DATE, bindMarker(INTERNAL_DATE)) + .value(BODY_START_OCTET, bindMarker(BODY_START_OCTET)) + .value(FULL_CONTENT_OCTETS, bindMarker(FULL_CONTENT_OCTETS)) + .value(BODY_OCTECTS, bindMarker(BODY_OCTECTS)) + .value(BODY_CONTENT, bindMarker(BODY_CONTENT)) + .value(HEADER_CONTENT, bindMarker(HEADER_CONTENT)) + .value(PROPERTIES, bindMarker(PROPERTIES)) + .value(TEXTUAL_LINE_COUNT, bindMarker(TEXTUAL_LINE_COUNT)) + .value(ATTACHMENTS, bindMarker(ATTACHMENTS))); + } + + private PreparedStatement prepareDelete(Session session) { + return session.prepare(QueryBuilder.delete() + .from(TABLE_NAME) + .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID)))); + } + + public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException { + return saveContent(message).thenCompose(pair -> + cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair))); + } + + private CompletableFuture<Pair<Optional<BlobId>, Optional<BlobId>>> saveContent(MailboxMessage message) throws MailboxException { + try { + CompletableFuture<Optional<BlobId>> bodyContent = blobsDAO.save( + IOUtils.toByteArray( + message.getBodyContent())); + CompletableFuture<Optional<BlobId>> headerContent = blobsDAO.save( + IOUtils.toByteArray( + message.getHeaderContent())); + + return bodyContent.thenCompose(bodyContentId -> + headerContent.thenApply(headerContentId -> + Pair.of(bodyContentId, headerContentId))); + } catch (IOException e) { + throw new MailboxException("Error saving mail content", e); + } + } + + private BoundStatement boundWriteStatement(MailboxMessage message, Pair<Optional<BlobId>, Optional<BlobId>> pair) { + CassandraMessageId messageId = (CassandraMessageId) message.getMessageId(); + return insert.bind() + .setUUID(MESSAGE_ID, messageId.get()) + .setTimestamp(INTERNAL_DATE, message.getInternalDate()) + .setInt(BODY_START_OCTET, (int) (message.getFullContentOctets() - message.getBodyOctets())) + .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctets()) + .setLong(BODY_OCTECTS, message.getBodyOctets()) + .setString(BODY_CONTENT, pair.getLeft().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE)) + .setString(HEADER_CONTENT, pair.getRight().map(BlobId::getId).orElse(DEFAULT_OBJECT_VALUE)) + .setLong(TEXTUAL_LINE_COUNT, Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE)) + .setList(PROPERTIES, message.getProperties().stream() + .map(x -> typesProvider.getDefinedUserType(PROPERTIES) + .newValue() + .setString(Properties.NAMESPACE, x.getNamespace()) + .setString(Properties.NAME, x.getLocalName()) + .setString(Properties.VALUE, x.getValue())) + .collect(Collectors.toList())) + .setList(ATTACHMENTS, message.getAttachments().stream() + .map(this::toUDT) + .collect(Guavate.toImmutableList())); + } + + private UDTValue toUDT(MessageAttachment messageAttachment) { + return typesProvider.getDefinedUserType(ATTACHMENTS) + .newValue() + .setString(Attachments.ID, messageAttachment.getAttachmentId().getId()) + .setString(Attachments.NAME, messageAttachment.getName().orNull()) + .setString(Attachments.CID, messageAttachment.getCid().transform(Cid::getValue).orNull()) + .setBool(Attachments.IS_INLINE, messageAttachment.isInline()); + } + + public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { + return CompletableFutureUtil.chainAll( + limit.applyOnStream(messageIds.stream().distinct()) + .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)), + ids -> rowToMessages(fetchType, ids)) + .thenApply(stream -> stream.flatMap(Function.identity())); + } + + private CompletableFuture<Stream<MessageResult>> rowToMessages(FetchType fetchType, Collection<ComposedMessageIdWithMetaData> ids) { + return FluentFutureStream.of( + ids.stream() + .map(id -> retrieveRow(id, fetchType) + .thenCompose((ResultSet resultSet) -> message(resultSet, id, fetchType)))) + .completableFuture(); + } + + private CompletableFuture<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { + CassandraMessageId cassandraMessageId = (CassandraMessageId) messageId.getComposedMessageId().getMessageId(); + + return cassandraAsyncExecutor.execute(retrieveSelect(fetchType) + .bind() + .setUUID(MESSAGE_ID, cassandraMessageId.get())); + } + + private CompletableFuture<MessageResult> + message(ResultSet rows,ComposedMessageIdWithMetaData messageIdWithMetaData, FetchType fetchType) { + ComposedMessageId messageId = messageIdWithMetaData.getComposedMessageId(); + + if (rows.isExhausted()) { + return CompletableFuture.completedFuture(notFound(messageIdWithMetaData)); + } + + Row row = rows.one(); + CompletableFuture<byte[]> contentFuture = buildContentRetriever(fetchType).apply(row); + + return contentFuture.thenApply(content -> { + MessageWithoutAttachment messageWithoutAttachment = + new MessageWithoutAttachment( + messageId.getMessageId(), + row.getTimestamp(INTERNAL_DATE), + row.getLong(FULL_CONTENT_OCTETS), + row.getInt(BODY_START_OCTET), + new SharedByteArrayInputStream(content), + messageIdWithMetaData.getFlags(), + getPropertyBuilder(row), + messageId.getMailboxId(), + messageId.getUid(), + messageIdWithMetaData.getModSeq()); + return found(Pair.of(messageWithoutAttachment, getAttachments(row, fetchType))); + }); + } + + private PropertyBuilder getPropertyBuilder(Row row) { + PropertyBuilder property = new PropertyBuilder( + row.getList(PROPERTIES, UDTValue.class).stream() + .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE))) + .collect(Collectors.toList())); + property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT)); + return property; + } + + private Stream<MessageAttachmentRepresentation> getAttachments(Row row, FetchType fetchType) { + switch (fetchType) { + case Full: + case Body: + List<UDTValue> udtValues = row.getList(ATTACHMENTS, UDTValue.class); + + return attachmentByIds(udtValues); + default: + return Stream.of(); + } + } + + private Stream<MessageAttachmentRepresentation> attachmentByIds(List<UDTValue> udtValues) { + return udtValues.stream() + .map(this::messageAttachmentByIdFrom); + } + + private MessageAttachmentRepresentation messageAttachmentByIdFrom(UDTValue udtValue) { + return MessageAttachmentRepresentation.builder() + .attachmentId(AttachmentId.from(udtValue.getString(Attachments.ID))) + .name(udtValue.getString(Attachments.NAME)) + .cid(Optional.ofNullable(udtValue.getString(Attachments.CID)).map(Cid::from)) + .isInline(udtValue.getBool(Attachments.IS_INLINE)) + .build(); + } + + private PreparedStatement retrieveSelect(FetchType fetchType) { + switch (fetchType) { + case Body: + return selectBody; + case Full: + return selectFields; + case Headers: + return selectHeaders; + case Metadata: + return selectMetadata; + default: + throw new RuntimeException("Unknown FetchType " + fetchType); + } + } + + public CompletableFuture<Void> delete(CassandraMessageId messageId) { + return cassandraAsyncExecutor.executeVoid(delete.bind() + .setUUID(MESSAGE_ID, messageId.get())); + } + + private Function<Row, CompletableFuture<byte[]>> buildContentRetriever(FetchType fetchType) { + switch (fetchType) { + case Full: + return this::getFullContent; + case Headers: + return this::getHeaderContent; + case Body: + return row -> getBodyContent(row) + .thenApply(data -> Bytes.concat(new byte[row.getInt(BODY_START_OCTET)], data)); + case Metadata: + return row -> CompletableFuture.completedFuture(EMPTY_BYTE_ARRAY); + default: + throw new RuntimeException("Unknown FetchType " + fetchType); + } + } + + private CompletableFuture<byte[]> getFullContent(Row row) { + return CompletableFutureUtil.combine( + getHeaderContent(row), + getBodyContent(row), + Bytes::concat); + } + + private CompletableFuture<byte[]> getBodyContent(Row row) { + return getFieldContent(BODY_CONTENT, row); + } + + private CompletableFuture<byte[]> getHeaderContent(Row row) { + return getFieldContent(HEADER_CONTENT, row); + } + + private CompletableFuture<byte[]> getFieldContent(String field, Row row) { + return blobsDAO.read(BlobId.from(row.getString(field))); + } + + public static MessageResult notFound(ComposedMessageIdWithMetaData id) { + return new MessageResult(id, Optional.empty()); + } + + public static MessageResult found(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message) { + return new MessageResult(message.getLeft().getMetadata(), Optional.of(message)); + } + + public static class MessageResult { + private final ComposedMessageIdWithMetaData metaData; + private final Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message; + + public MessageResult(ComposedMessageIdWithMetaData metaData, Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> message) { + this.metaData = metaData; + this.message = message; + } + + public ComposedMessageIdWithMetaData getMetadata() { + return metaData; + } + + public boolean isFound() { + return message.isPresent(); + } + + public Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message() { + return message.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/3b621a3d/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java new file mode 100644 index 0000000..a3ae0fe --- /dev/null +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV2Test.java @@ -0,0 +1,183 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.util.Date; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import javax.mail.Flags; +import javax.mail.util.SharedByteArrayInputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.init.CassandraModuleComposite; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.cassandra.CassandraId; +import org.apache.james.mailbox.cassandra.CassandraMessageId; +import org.apache.james.mailbox.cassandra.Limit; +import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule; +import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule; +import org.apache.james.mailbox.model.ComposedMessageId; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Bytes; + +public class CassandraMessageDAOV2Test { + private static final int BODY_START = 16; + private static final CassandraId MAILBOX_ID = CassandraId.timeBased(); + private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n"; + private static final MessageUid messageUid = MessageUid.of(1); + + private CassandraCluster cassandra; + + private CassandraMessageDAOV2 testee; + private CassandraBlobsDAO blobsDAO; + private CassandraMessageId.Factory messageIdFactory; + + private SimpleMailboxMessage message; + private CassandraMessageId messageId; + private ComposedMessageId composedMessageId; + private List<ComposedMessageIdWithMetaData> messageIds; + + @Before + public void setUp() { + cassandra = CassandraCluster.create(new CassandraModuleComposite(new CassandraMessageModule(), new CassandraBlobModule())); + cassandra.ensureAllTables(); + + messageIdFactory = new CassandraMessageId.Factory(); + messageId = messageIdFactory.generate(); + blobsDAO = new CassandraBlobsDAO(cassandra.getConf()); + testee = new CassandraMessageDAOV2(cassandra.getConf(), cassandra.getTypesProvider(), blobsDAO); + + composedMessageId = new ComposedMessageId(MAILBOX_ID, messageId, messageUid); + + messageIds = ImmutableList.of(ComposedMessageIdWithMetaData.builder() + .composedMessageId(composedMessageId) + .flags(new Flags()) + .modSeq(1) + .build()); + } + + @After + public void tearDown() { + cassandra.clearAllTables(); + cassandra.close(); + } + + @Test + public void saveShouldSaveNullValueForTextualLineCountAsZero() throws Exception { + message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder()); + + testee.save(message).join(); + + MessageWithoutAttachment attachmentRepresentation = + toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited())); + + assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()) + .isEqualTo(0L); + } + + @Test + public void saveShouldSaveTextualLineCount() throws Exception { + long textualLineCount = 10L; + PropertyBuilder propertyBuilder = new PropertyBuilder(); + propertyBuilder.setTextualLineCount(textualLineCount); + message = createMessage(messageId, CONTENT, BODY_START, propertyBuilder); + + testee.save(message).join(); + + MessageWithoutAttachment attachmentRepresentation = + toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Metadata, Limit.unlimited())); + + assertThat(attachmentRepresentation.getPropertyBuilder().getTextualLineCount()).isEqualTo(textualLineCount); + } + + @Test + public void saveShouldStoreMessageWithFullContent() throws Exception { + message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder()); + + testee.save(message).join(); + + MessageWithoutAttachment attachmentRepresentation = + toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Full, Limit.unlimited())); + + assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8)) + .isEqualTo(CONTENT); + } + + @Test + public void saveShouldStoreMessageWithBodyContent() throws Exception { + message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder()); + + testee.save(message).join(); + + MessageWithoutAttachment attachmentRepresentation = + toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Body, Limit.unlimited())); + + byte[] expected = Bytes.concat( + new byte[BODY_START], + CONTENT.substring(BODY_START).getBytes(Charsets.UTF_8)); + assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8)) + .isEqualTo(IOUtils.toString(new ByteArrayInputStream(expected), Charsets.UTF_8)); + } + + @Test + public void saveShouldStoreMessageWithHeaderContent() throws Exception { + message = createMessage(messageId, CONTENT, BODY_START, new PropertyBuilder()); + + testee.save(message).join(); + + MessageWithoutAttachment attachmentRepresentation = + toMessage(testee.retrieveMessages(messageIds, MessageMapper.FetchType.Headers, Limit.unlimited())); + + assertThat(IOUtils.toString(attachmentRepresentation.getContent(), Charsets.UTF_8)) + .isEqualTo(CONTENT.substring(0, BODY_START)); + } + + private SimpleMailboxMessage createMessage(MessageId messageId, String content, int bodyStart, PropertyBuilder propertyBuilder) { + return new SimpleMailboxMessage(messageId, new Date(), content.length(), bodyStart, + new SharedByteArrayInputStream(content.getBytes(Charsets.UTF_8)), new Flags(), + propertyBuilder, MAILBOX_ID); + } + + private MessageWithoutAttachment toMessage(CompletableFuture<Stream<CassandraMessageDAOV2.MessageResult>> readOptional) throws InterruptedException, java.util.concurrent.ExecutionException { + return readOptional.join() + .map(CassandraMessageDAOV2.MessageResult::message) + .map(Pair::getLeft) + .findAny() + .orElseThrow(() -> new IllegalStateException("Collection is not supposed to be empty")); + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org