This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2b66fac96cc262640e895b6dea1f3e0ed2626312
Author: Benoit Tellier <[email protected]>
AuthorDate: Tue Oct 20 14:45:11 2020 +0700

    JAMES-3430 Implement CassandraMessageDAOV3
    
    Compared to V2 it offers a better structure for Properties (as column 
instead of as UDT list)
    resulting in occupied space decrease (1331 bytes per message -> 310 bytes 
per message on a simple
    benchmark), resulting in execution timing improvments (0.152ms -> 0.126ms 
at the table level, of
    course we will also benefits from transferring less bytes)
---
 .../cassandra/mail/CassandraMessageDAOV3.java      | 329 +++++++++++++++++++++
 .../cassandra/modules/CassandraMessageModule.java  |  27 ++
 .../cassandra/table/CassandraMessageV3Table.java   |  55 ++++
 .../cassandra/mail/CassandraMessageDAOV3Test.java  | 194 ++++++++++++
 4 files changed, 605 insertions(+)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
new file mode 100644
index 0000000..dc1b1c0
--- /dev/null
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -0,0 +1,329 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.ATTACHMENTS;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.BODY_CONTENT;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.BODY_OCTECTS;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.BODY_START_OCTET;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.FULL_CONTENT_OCTETS;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.HEADER_CONTENT;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.INTERNAL_DATE;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_DESCRIPTION;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_DISPOSITION_PARAMETERS;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_DISPOSITION_TYPE;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_ID;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_LANGUAGE;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_LOCATION;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_MD5;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_TRANSFER_ENCODING;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_TYPE_PARAMETERS;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.MEDIA_TYPE;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.SUB_TYPE;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.TABLE_NAME;
+import static 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.TEXTUAL_LINE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
+import 
org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import 
org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Attachments;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.AttachmentId;
+import org.apache.james.mailbox.model.Cid;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageAttachmentMetadata;
+import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
+
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+
+public class CassandraMessageDAOV3 {
+    public static final long DEFAULT_LONG_VALUE = 0L;
+    private static final byte[] EMPTY_BYTE_ARRAY = {};
+
+    private final CassandraAsyncExecutor cassandraAsyncExecutor;
+    private final CassandraTypesProvider typesProvider;
+    private final BlobStore blobStore;
+    private final BlobId.Factory blobIdFactory;
+    private final PreparedStatement insert;
+    private final PreparedStatement delete;
+    private final PreparedStatement select;
+    private final Cid.CidParser cidParser;
+    private final ConsistencyLevel consistencyLevel;
+
+    @Inject
+    public CassandraMessageDAOV3(Session session, CassandraTypesProvider 
typesProvider, BlobStore blobStore,
+                                 BlobId.Factory blobIdFactory,
+                                 CassandraConsistenciesConfiguration 
consistenciesConfiguration) {
+        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+        this.consistencyLevel = consistenciesConfiguration.getRegular();
+        this.typesProvider = typesProvider;
+        this.blobStore = blobStore;
+        this.blobIdFactory = blobIdFactory;
+
+        this.insert = prepareInsert(session);
+        this.delete = prepareDelete(session);
+        this.select = prepareSelect(session);
+        this.cidParser = Cid.parser().relaxed();
+    }
+
+    private PreparedStatement prepareSelect(Session session) {
+        return session.prepare(select()
+            .from(TABLE_NAME)
+            .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+    }
+
+    private PreparedStatement prepareInsert(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(MESSAGE_ID, bindMarker(MESSAGE_ID))
+            .value(INTERNAL_DATE, bindMarker(INTERNAL_DATE))
+            .value(BODY_START_OCTET, bindMarker(BODY_START_OCTET))
+            .value(FULL_CONTENT_OCTETS, bindMarker(FULL_CONTENT_OCTETS))
+            .value(BODY_OCTECTS, bindMarker(BODY_OCTECTS))
+            .value(BODY_CONTENT, bindMarker(BODY_CONTENT))
+            .value(HEADER_CONTENT, bindMarker(HEADER_CONTENT))
+            .value(CONTENT_DESCRIPTION, bindMarker(CONTENT_DESCRIPTION))
+            .value(CONTENT_DISPOSITION_TYPE, 
bindMarker(CONTENT_DISPOSITION_TYPE))
+            .value(MEDIA_TYPE, bindMarker(MEDIA_TYPE))
+            .value(SUB_TYPE, bindMarker(SUB_TYPE))
+            .value(CONTENT_ID, bindMarker(CONTENT_ID))
+            .value(CONTENT_MD5, bindMarker(CONTENT_MD5))
+            .value(CONTENT_TRANSFER_ENCODING, 
bindMarker(CONTENT_TRANSFER_ENCODING))
+            .value(CONTENT_LOCATION, bindMarker(CONTENT_LOCATION))
+            .value(CONTENT_LANGUAGE, bindMarker(CONTENT_LANGUAGE))
+            .value(CONTENT_DISPOSITION_PARAMETERS, 
bindMarker(CONTENT_DISPOSITION_PARAMETERS))
+            .value(CONTENT_TYPE_PARAMETERS, 
bindMarker(CONTENT_TYPE_PARAMETERS))
+            .value(TEXTUAL_LINE_COUNT, bindMarker(TEXTUAL_LINE_COUNT))
+            .value(ATTACHMENTS, bindMarker(ATTACHMENTS)));
+    }
+
+    private PreparedStatement prepareDelete(Session session) {
+        return session.prepare(QueryBuilder.delete()
+            .from(TABLE_NAME)
+            .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
+    }
+
+    public Mono<Void> save(MailboxMessage message) throws MailboxException {
+        return saveContent(message)
+            .flatMap(pair -> 
cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)));
+    }
+
+    private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) 
throws MailboxException {
+        try {
+            byte[] headerContent = 
IOUtils.toByteArray(message.getHeaderContent());
+            byte[] bodyContent = IOUtils.toByteArray(message.getBodyContent());
+
+            Mono<BlobId> bodyFuture = 
Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyContent, 
LOW_COST));
+            Mono<BlobId> headerFuture = 
Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, 
SIZE_BASED));
+
+            return headerFuture.zipWith(bodyFuture);
+        } catch (IOException e) {
+            throw new MailboxException("Error saving mail content", e);
+        }
+    }
+
+    private BoundStatement boundWriteStatement(MailboxMessage message, 
Tuple2<BlobId, BlobId> pair) {
+        CassandraMessageId messageId = (CassandraMessageId) 
message.getMessageId();
+        PropertyBuilder propertyBuilder = new 
PropertyBuilder(message.getProperties());
+        return insert.bind()
+            .setUUID(MESSAGE_ID, messageId.get())
+            .setTimestamp(INTERNAL_DATE, message.getInternalDate())
+            .setInt(BODY_START_OCTET, (int) (message.getHeaderOctets()))
+            .setLong(FULL_CONTENT_OCTETS, message.getFullContentOctets())
+            .setLong(BODY_OCTECTS, message.getBodyOctets())
+            .setString(BODY_CONTENT, pair.getT2().asString())
+            .setString(HEADER_CONTENT, pair.getT1().asString())
+            .setLong(TEXTUAL_LINE_COUNT, 
Optional.ofNullable(message.getTextualLineCount()).orElse(DEFAULT_LONG_VALUE))
+            .setString(CONTENT_DESCRIPTION, 
propertyBuilder.getContentDescription())
+            .setString(CONTENT_DISPOSITION_TYPE, 
propertyBuilder.getContentDispositionType())
+            .setString(MEDIA_TYPE, propertyBuilder.getMediaType())
+            .setString(SUB_TYPE, propertyBuilder.getSubType())
+            .setString(CONTENT_ID, propertyBuilder.getContentID())
+            .setString(CONTENT_MD5, propertyBuilder.getContentMD5())
+            .setString(CONTENT_TRANSFER_ENCODING, 
propertyBuilder.getContentTransferEncoding())
+            .setString(CONTENT_LOCATION, propertyBuilder.getContentLocation())
+            .setList(CONTENT_LANGUAGE, propertyBuilder.getContentLanguage())
+            .setMap(CONTENT_DISPOSITION_PARAMETERS, 
propertyBuilder.getContentDispositionParameters())
+            .setMap(CONTENT_TYPE_PARAMETERS, 
propertyBuilder.getContentTypeParameters())
+            .setList(ATTACHMENTS, buildAttachmentUdt(message));
+    }
+
+    private ImmutableList<UDTValue> buildAttachmentUdt(MailboxMessage message) 
{
+        return message.getAttachments().stream()
+            .map(this::toUDT)
+            .collect(Guavate.toImmutableList());
+    }
+
+    private UDTValue toUDT(MessageAttachmentMetadata messageAttachment) {
+        UDTValue result = typesProvider.getDefinedUserType(ATTACHMENTS)
+            .newValue()
+            .setString(Attachments.ID, 
messageAttachment.getAttachmentId().getId())
+            .setBool(Attachments.IS_INLINE, messageAttachment.isInline());
+        messageAttachment.getName()
+            .ifPresent(name -> result.setString(Attachments.NAME, name));
+        messageAttachment.getCid()
+            .ifPresent(cid -> result.setString(Attachments.CID, 
cid.getValue()));
+        return result;
+    }
+
+    public Mono<MessageRepresentation> 
retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) {
+        CassandraMessageId cassandraMessageId = (CassandraMessageId) 
id.getComposedMessageId().getMessageId();
+        return retrieveMessage(cassandraMessageId, fetchType);
+    }
+
+    public Mono<MessageRepresentation> retrieveMessage(CassandraMessageId 
cassandraMessageId, FetchType fetchType) {
+        return retrieveRow(cassandraMessageId)
+                .flatMap(resultSet -> message(resultSet, cassandraMessageId, 
fetchType));
+    }
+
+    private Mono<ResultSet> retrieveRow(CassandraMessageId messageId) {
+        return cassandraAsyncExecutor.execute(select
+            .bind()
+            .setUUID(MESSAGE_ID, messageId.get())
+            .setConsistencyLevel(consistencyLevel));
+    }
+
+    private Mono<MessageRepresentation>
+    message(ResultSet rows, CassandraMessageId cassandraMessageId, FetchType 
fetchType) {
+        if (rows.isExhausted()) {
+            return Mono.empty();
+        }
+
+        Row row = rows.one();
+        BlobId headerId = retrieveBlobId(HEADER_CONTENT, row);
+        BlobId bodyId = retrieveBlobId(BODY_CONTENT, row);
+        int bodyStartOctet = row.getInt(BODY_START_OCTET);
+
+        return buildContentRetriever(fetchType, headerId, bodyId, 
bodyStartOctet).map(content ->
+            new MessageRepresentation(
+                cassandraMessageId,
+                row.getTimestamp(INTERNAL_DATE),
+                row.getLong(FULL_CONTENT_OCTETS),
+                row.getInt(BODY_START_OCTET),
+                new SharedByteArrayInputStream(content),
+                getPropertyBuilder(row),
+                getAttachments(row).collect(Guavate.toImmutableList()),
+                headerId,
+                bodyId));
+    }
+
+    private PropertyBuilder getPropertyBuilder(Row row) {
+        PropertyBuilder property = new PropertyBuilder();
+        property.setContentDescription(row.getString(CONTENT_DESCRIPTION));
+        
property.setContentDispositionType(row.getString(CONTENT_DISPOSITION_TYPE));
+        property.setMediaType(row.getString(MEDIA_TYPE));
+        property.setSubType(row.getString(SUB_TYPE));
+        property.setContentID(row.getString(CONTENT_ID));
+        property.setContentMD5(row.getString(CONTENT_MD5));
+        
property.setContentTransferEncoding(row.getString(CONTENT_TRANSFER_ENCODING));
+        property.setContentLocation(row.getString(CONTENT_LOCATION));
+        property.setContentLanguage(row.getList(CONTENT_LANGUAGE, 
String.class));
+        
property.setContentDispositionParameters(row.getMap(CONTENT_DISPOSITION_PARAMETERS,
 String.class, String.class));
+        property.setContentTypeParameters(row.getMap(CONTENT_TYPE_PARAMETERS, 
String.class, String.class));
+        property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
+        return property;
+    }
+
+    private Stream<MessageAttachmentRepresentation> getAttachments(Row row) {
+        List<UDTValue> udtValues = row.getList(ATTACHMENTS, UDTValue.class);
+        return attachmentByIds(udtValues);
+    }
+
+    private Stream<MessageAttachmentRepresentation> 
attachmentByIds(List<UDTValue> udtValues) {
+        return udtValues.stream()
+            .map(this::messageAttachmentByIdFrom);
+    }
+
+    private MessageAttachmentRepresentation messageAttachmentByIdFrom(UDTValue 
udtValue) {
+        return MessageAttachmentRepresentation.builder()
+            
.attachmentId(AttachmentId.from(udtValue.getString(Attachments.ID)))
+            .name(udtValue.getString(Attachments.NAME))
+            .cid(cidParser.parse(udtValue.getString(Attachments.CID)))
+            .isInline(udtValue.getBool(Attachments.IS_INLINE))
+            .build();
+    }
+
+    public Mono<Void> delete(CassandraMessageId messageId) {
+        return cassandraAsyncExecutor.executeVoid(delete.bind()
+            .setUUID(MESSAGE_ID, messageId.get()));
+    }
+
+    private Mono<byte[]> buildContentRetriever(FetchType fetchType, BlobId 
headerId, BlobId bodyId, int bodyStartOctet) {
+        switch (fetchType) {
+            case Full:
+                return getFullContent(headerId, bodyId);
+            case Headers:
+                return getContent(headerId);
+            case Body:
+                return getContent(bodyId)
+                    .map(data -> Bytes.concat(new byte[bodyStartOctet], data));
+            case Metadata:
+                return Mono.just(EMPTY_BYTE_ARRAY);
+            default:
+                throw new RuntimeException("Unknown FetchType " + fetchType);
+        }
+    }
+
+    private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) {
+        return getContent(headerId)
+            .zipWith(getContent(bodyId), Bytes::concat);
+    }
+
+    private Mono<byte[]> getContent(BlobId blobId) {
+        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), 
blobId));
+    }
+
+    private BlobId retrieveBlobId(String field, Row row) {
+        return blobIdFactory.from(row.getString(field));
+    }
+}
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMessageModule.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMessageModule.java
index 280eee0..e7c36c6 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMessageModule.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraMessageModule.java
@@ -22,6 +22,8 @@ package org.apache.james.mailbox.cassandra.modules;
 import static com.datastax.driver.core.DataType.bigint;
 import static com.datastax.driver.core.DataType.cboolean;
 import static com.datastax.driver.core.DataType.cint;
+import static com.datastax.driver.core.DataType.frozenList;
+import static com.datastax.driver.core.DataType.frozenMap;
 import static com.datastax.driver.core.DataType.set;
 import static com.datastax.driver.core.DataType.text;
 import static com.datastax.driver.core.DataType.timestamp;
@@ -31,6 +33,7 @@ import 
org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageIdTable;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageIds;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table;
 import org.apache.james.mailbox.cassandra.table.Flag;
 import org.apache.james.mailbox.cassandra.table.MessageIdToImapUid;
 
@@ -94,6 +97,30 @@ public interface CassandraMessageModule {
             .addColumn(CassandraMessageV2Table.HEADER_CONTENT, text())
             .addUDTListColumn(CassandraMessageV2Table.ATTACHMENTS, 
SchemaBuilder.frozen(CassandraMessageV2Table.ATTACHMENTS))
             .addUDTListColumn(CassandraMessageV2Table.PROPERTIES, 
SchemaBuilder.frozen(CassandraMessageV2Table.PROPERTIES)))
+        .table(CassandraMessageV3Table.TABLE_NAME)
+        .comment("Holds message metadata, independently of any mailboxes. 
Content of messages is stored " +
+            "in `blobs` and `blobparts` tables. Optimizes property storage 
compared to V2.")
+        .statement(statement -> statement
+            .addPartitionKey(CassandraMessageIds.MESSAGE_ID, timeuuid())
+            .addColumn(CassandraMessageV3Table.INTERNAL_DATE, timestamp())
+            .addColumn(CassandraMessageV3Table.BODY_START_OCTET, cint())
+            .addColumn(CassandraMessageV3Table.BODY_OCTECTS, bigint())
+            .addColumn(CassandraMessageV3Table.TEXTUAL_LINE_COUNT, bigint())
+            .addColumn(CassandraMessageV3Table.FULL_CONTENT_OCTETS, bigint())
+            .addColumn(CassandraMessageV3Table.BODY_CONTENT, text())
+            .addColumn(CassandraMessageV3Table.HEADER_CONTENT, text())
+            .addColumn(CassandraMessageV3Table.Properties.CONTENT_DESCRIPTION, 
text())
+            
.addColumn(CassandraMessageV3Table.Properties.CONTENT_DISPOSITION_TYPE, text())
+            .addColumn(CassandraMessageV3Table.Properties.MEDIA_TYPE, text())
+            .addColumn(CassandraMessageV3Table.Properties.SUB_TYPE, text())
+            .addColumn(CassandraMessageV3Table.Properties.CONTENT_ID, text())
+            .addColumn(CassandraMessageV3Table.Properties.CONTENT_MD5, text())
+            
.addColumn(CassandraMessageV3Table.Properties.CONTENT_TRANSFER_ENCODING, text())
+            .addColumn(CassandraMessageV3Table.Properties.CONTENT_LOCATION, 
text())
+            .addColumn(CassandraMessageV3Table.Properties.CONTENT_LANGUAGE, 
frozenList(text()))
+            
.addColumn(CassandraMessageV3Table.Properties.CONTENT_DISPOSITION_PARAMETERS, 
frozenMap(text(), text()))
+            
.addColumn(CassandraMessageV3Table.Properties.CONTENT_TYPE_PARAMETERS, 
frozenMap(text(), text()))
+            .addUDTListColumn(CassandraMessageV2Table.ATTACHMENTS, 
SchemaBuilder.frozen(CassandraMessageV3Table.ATTACHMENTS)))
         .type(CassandraMessageV2Table.PROPERTIES)
         .statement(statement -> statement
             .addColumn(CassandraMessageV2Table.Properties.NAMESPACE, text())
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV3Table.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV3Table.java
new file mode 100644
index 0000000..b87d074
--- /dev/null
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV3Table.java
@@ -0,0 +1,55 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+public interface CassandraMessageV3Table {
+
+    String TABLE_NAME = "messageV3";
+    String INTERNAL_DATE = "internalDate";
+    String BODY_START_OCTET = "bodyStartOctet";
+    String FULL_CONTENT_OCTETS = "fullContentOctets";
+    String BODY_OCTECTS = "bodyOctets";
+    String TEXTUAL_LINE_COUNT = "textualLineCount";
+    String BODY_CONTENT = "bodyContent";
+    String HEADER_CONTENT = "headerContent";
+    String ATTACHMENTS = "attachments";
+
+    interface Properties {
+        String MEDIA_TYPE = "mediaType";
+        String SUB_TYPE = "subType";
+        String CONTENT_ID = "contentId";
+        String CONTENT_LOCATION = "contentLocation";
+        String CONTENT_DESCRIPTION = "contentDescription";
+        String CONTENT_TRANSFER_ENCODING = "contentTransferEncoding";
+        String CONTENT_DISPOSITION_TYPE = "contentDispositionType";
+        String CONTENT_DISPOSITION_PARAMETERS = "contentDispositionParameters";
+        String CONTENT_TYPE_PARAMETERS = "contentTypeParameters";
+        String CONTENT_MD5 = "contentMd5";
+        String CONTENT_LANGUAGE = "contentLanguage";
+    }
+
+    interface Attachments {
+        String ID = "id";
+        String NAME = "name";
+        String CID = "cid";
+        String IS_INLINE = "isInline";
+    }
+
+}
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
new file mode 100644
index 0000000..ab6e594
--- /dev/null
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3Test.java
@@ -0,0 +1,194 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.cassandra.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import javax.mail.Flags;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.cassandra.CassandraBlobModule;
+import org.apache.james.blob.cassandra.CassandraBlobStoreFactory;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageAttachmentMetadata;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
+
+import reactor.core.publisher.Mono;
+
+class CassandraMessageDAOV3Test {
+    private static final int BODY_START = 16;
+    private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
+    private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
+    private static final MessageUid messageUid = MessageUid.of(1);
+    private static final List<MessageAttachmentMetadata> NO_ATTACHMENT = 
ImmutableList.of();
+
+    public static final CassandraModule MODULES = 
CassandraModule.aggregateModules(
+            CassandraMessageModule.MODULE,
+            CassandraBlobModule.MODULE,
+            CassandraSchemaVersionModule.MODULE);
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new 
CassandraClusterExtension(
+            MODULES);
+
+    private CassandraMessageDAOV3 testee;
+
+    private SimpleMailboxMessage message;
+    private CassandraMessageId messageId;
+    private ComposedMessageIdWithMetaData messageIdWithMetadata;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        CassandraMessageId.Factory messageIdFactory = new 
CassandraMessageId.Factory();
+        messageId = messageIdFactory.generate();
+        BlobStore blobStore = 
CassandraBlobStoreFactory.forTesting(cassandra.getConf())
+            .passthrough();
+        HashBlobId.Factory blobIdFactory = new HashBlobId.Factory();
+        testee = new CassandraMessageDAOV3(
+            cassandra.getConf(),
+            cassandra.getTypesProvider(),
+            blobStore,
+            blobIdFactory,
+            cassandraCluster.getCassandraConsistenciesConfiguration());
+
+        messageIdWithMetadata = ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(MAILBOX_ID, 
messageId, messageUid))
+                .flags(new Flags())
+                .modSeq(ModSeq.of(1))
+                .build();
+    }
+
+    @Test
+    void saveShouldSaveNullValueForTextualLineCountAsZero() throws Exception {
+        message = createMessage(messageId, CONTENT, BODY_START, new 
PropertyBuilder(), NO_ATTACHMENT);
+
+        testee.save(message).block();
+
+        MessageRepresentation attachmentRepresentation =
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, 
MessageMapper.FetchType.Metadata));
+
+        
assertThat(attachmentRepresentation.getProperties().getTextualLineCount())
+            .isEqualTo(0L);
+    }
+
+    @Test
+    void saveShouldSaveTextualLineCount() throws Exception {
+        long textualLineCount = 10L;
+        PropertyBuilder propertyBuilder = new PropertyBuilder();
+        propertyBuilder.setTextualLineCount(textualLineCount);
+        message = createMessage(messageId, CONTENT, BODY_START, 
propertyBuilder, NO_ATTACHMENT);
+
+        testee.save(message).block();
+
+        MessageRepresentation attachmentRepresentation =
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, 
MessageMapper.FetchType.Metadata));
+
+        
assertThat(attachmentRepresentation.getProperties().getTextualLineCount()).isEqualTo(textualLineCount);
+    }
+
+    @Test
+    void saveShouldStoreMessageWithFullContent() throws Exception {
+        message = createMessage(messageId, CONTENT, BODY_START, new 
PropertyBuilder(), NO_ATTACHMENT);
+
+        testee.save(message).block();
+
+        MessageRepresentation attachmentRepresentation =
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, 
MessageMapper.FetchType.Full));
+
+        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), 
StandardCharsets.UTF_8))
+            .isEqualTo(CONTENT);
+    }
+
+    @Test
+    void saveShouldStoreMessageWithBodyContent() throws Exception {
+        message = createMessage(messageId, CONTENT, BODY_START, new 
PropertyBuilder(), NO_ATTACHMENT);
+
+        testee.save(message).block();
+
+        MessageRepresentation attachmentRepresentation =
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, 
MessageMapper.FetchType.Body));
+
+        byte[] expected = Bytes.concat(
+            new byte[BODY_START],
+            CONTENT.substring(BODY_START).getBytes(StandardCharsets.UTF_8));
+        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), 
StandardCharsets.UTF_8))
+            .isEqualTo(IOUtils.toString(new ByteArrayInputStream(expected), 
StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void saveShouldStoreMessageWithHeaderContent() throws Exception {
+        message = createMessage(messageId, CONTENT, BODY_START, new 
PropertyBuilder(), NO_ATTACHMENT);
+
+        testee.save(message).block();
+
+        MessageRepresentation attachmentRepresentation =
+            toMessage(testee.retrieveMessage(messageIdWithMetadata, 
MessageMapper.FetchType.Headers));
+
+        assertThat(IOUtils.toString(attachmentRepresentation.getContent(), 
StandardCharsets.UTF_8))
+            .isEqualTo(CONTENT.substring(0, BODY_START));
+    }
+
+    private SimpleMailboxMessage createMessage(MessageId messageId, String 
content, int bodyStart, PropertyBuilder propertyBuilder, 
Collection<MessageAttachmentMetadata> attachments) {
+        return SimpleMailboxMessage.builder()
+            .messageId(messageId)
+            .mailboxId(MAILBOX_ID)
+            .uid(messageUid)
+            .internalDate(new Date())
+            .bodyStartOctet(bodyStart)
+            .size(content.length())
+            .content(new 
SharedByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)))
+            .flags(new Flags())
+            .properties(propertyBuilder)
+            .addAttachments(attachments)
+            .build();
+    }
+
+    private MessageRepresentation toMessage(Mono<MessageRepresentation> read) {
+        return read.blockOptional()
+            .orElseThrow(() -> new IllegalStateException("Collection is not 
supposed to be empty"));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to