JAMES-2608 implement CassandraMailRepositoryMailDaoV2 to handle several headers for each user
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d0f973b1 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d0f973b1 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d0f973b1 Branch: refs/heads/master Commit: d0f973b1071bf4ae43a71dd5b61444ba0904be89 Parents: 7a2e552 Author: Matthieu Baechler <[email protected]> Authored: Wed Nov 28 18:41:07 2018 +0100 Committer: Raphael Ouazana <[email protected]> Committed: Wed Dec 19 09:24:12 2018 +0100 ---------------------------------------------------------------------- .../CassandraMailRepositoryMailDaoV2.java | 277 +++++++++++++++++++ .../CassandraMailRepositoryModule.java | 23 ++ .../cassandra/MailRepositoryTableV2.java | 49 ++++ .../CassandraMailRepositoryMailDAOTest.java | 16 ++ 4 files changed, 365 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/d0f973b1/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java new file mode 100644 index 0000000..10b44b1 --- /dev/null +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDaoV2.java @@ -0,0 +1,277 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailrepository.cassandra; + +import static com.datastax.driver.core.DataType.text; +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.ATTRIBUTES; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.BODY_BLOB_ID; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.CONTENT_TABLE_NAME; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.ERROR_MESSAGE; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.HEADER_BLOB_ID; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.HeaderEntrty.HEADER_NAME_INDEX; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.HeaderEntrty.HEADER_VALUE_INDEX; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.HeaderEntrty.USER_INDEX; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.LAST_UPDATED; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.MAIL_KEY; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.MAIL_PROPERTIES; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.MESSAGE_SIZE; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.PER_RECIPIENT_SPECIFIC_HEADERS; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.RECIPIENTS; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.REMOTE_ADDR; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.REMOTE_HOST; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.REPOSITORY_NAME; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.SENDER; +import static org.apache.james.mailrepository.cassandra.MailRepositoryTableV2.STATE; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import javax.inject.Inject; +import javax.mail.MessagingException; +import javax.mail.internet.AddressException; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.blob.api.BlobId; +import org.apache.james.core.MailAddress; +import org.apache.james.mailrepository.api.MailKey; +import org.apache.james.mailrepository.api.MailRepositoryUrl; +import org.apache.james.server.core.MailImpl; +import org.apache.james.util.streams.Iterators; +import org.apache.mailet.Mail; +import org.apache.mailet.PerRecipientHeaders; +import org.apache.mailet.PerRecipientHeaders.Header; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TupleType; +import com.datastax.driver.core.TupleValue; +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class CassandraMailRepositoryMailDaoV2 implements CassandraMailRepositoryMailDaoAPI { + + private final CassandraAsyncExecutor executor; + private final PreparedStatement insertMail; + private final PreparedStatement deleteMail; + private final PreparedStatement selectMail; + private final BlobId.Factory blobIdFactory; + private final TupleType userHeaderNameHeaderValueTriple; + + @Inject + @VisibleForTesting + CassandraMailRepositoryMailDaoV2(Session session, BlobId.Factory blobIdFactory) { + this.executor = new CassandraAsyncExecutor(session); + + this.insertMail = prepareInsert(session); + this.deleteMail = prepareDelete(session); + this.selectMail = prepareSelect(session); + this.blobIdFactory = blobIdFactory; + this.userHeaderNameHeaderValueTriple = session.getCluster().getMetadata().newTupleType(text(), text(), text()); + } + + private PreparedStatement prepareDelete(Session session) { + return session.prepare(delete() + .from(CONTENT_TABLE_NAME) + .where(eq(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME))) + .and(eq(MAIL_KEY, bindMarker(MAIL_KEY)))); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(CONTENT_TABLE_NAME) + .value(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME)) + .value(MAIL_KEY, bindMarker(MAIL_KEY)) + .value(MESSAGE_SIZE, bindMarker(MESSAGE_SIZE)) + .value(STATE, bindMarker(STATE)) + .value(SENDER, bindMarker(SENDER)) + .value(RECIPIENTS, bindMarker(RECIPIENTS)) + .value(ATTRIBUTES, bindMarker(ATTRIBUTES)) + .value(ERROR_MESSAGE, bindMarker(ERROR_MESSAGE)) + .value(REMOTE_ADDR, bindMarker(REMOTE_ADDR)) + .value(REMOTE_HOST, bindMarker(REMOTE_HOST)) + .value(LAST_UPDATED, bindMarker(LAST_UPDATED)) + .value(HEADER_BLOB_ID, bindMarker(HEADER_BLOB_ID)) + .value(BODY_BLOB_ID, bindMarker(BODY_BLOB_ID)) + .value(PER_RECIPIENT_SPECIFIC_HEADERS, bindMarker(PER_RECIPIENT_SPECIFIC_HEADERS))); + } + + private PreparedStatement prepareSelect(Session session) { + return session.prepare( + select(MAIL_PROPERTIES) + .from(CONTENT_TABLE_NAME) + .where(eq(REPOSITORY_NAME, bindMarker(REPOSITORY_NAME))) + .and(eq(MAIL_KEY, bindMarker(MAIL_KEY)))); + } + + public CompletableFuture<Void> store(MailRepositoryUrl url, Mail mail, BlobId headerId, BlobId bodyId) throws MessagingException { + return executor.executeVoid(insertMail.bind() + .setString(REPOSITORY_NAME, url.asString()) + .setString(MAIL_KEY, mail.getName()) + .setString(HEADER_BLOB_ID, headerId.asString()) + .setString(BODY_BLOB_ID, bodyId.asString()) + .setString(STATE, mail.getState()) + .setString(SENDER, mail.getMaybeSender().asString(null)) + .setList(RECIPIENTS, asStringList(mail.getRecipients())) + .setString(ERROR_MESSAGE, mail.getErrorMessage()) + .setString(REMOTE_ADDR, mail.getRemoteAddr()) + .setString(REMOTE_HOST, mail.getRemoteHost()) + .setLong(MESSAGE_SIZE, mail.getMessageSize()) + .setTimestamp(LAST_UPDATED, mail.getLastUpdated()) + .setMap(ATTRIBUTES, toRawAttributeMap(mail)) + .setList(PER_RECIPIENT_SPECIFIC_HEADERS, toTupleList(mail.getPerRecipientSpecificHeaders())) + ); + } + + public CompletableFuture<Void> remove(MailRepositoryUrl url, MailKey key) { + return executor.executeVoid(deleteMail.bind() + .setString(REPOSITORY_NAME, url.asString()) + .setString(MAIL_KEY, key.asString())); + } + + public CompletableFuture<Optional<MailDTO>> read(MailRepositoryUrl url, MailKey key) { + return executor.executeSingleRow(selectMail.bind() + .setString(REPOSITORY_NAME, url.asString()) + .setString(MAIL_KEY, key.asString())) + .thenApply(rowOptional -> rowOptional.map(this::toMail)); + } + + private MailDTO toMail(Row row) { + MailAddress sender = Optional.ofNullable(row.getString(SENDER)) + .map(MailAddress::getMailSender) + .orElse(null); + List<MailAddress> recipients = row.getList(RECIPIENTS, String.class) + .stream() + .map(Throwing.function(MailAddress::new)) + .collect(Guavate.toImmutableList()); + String state = row.getString(STATE); + String remoteAddr = row.getString(REMOTE_ADDR); + String remoteHost = row.getString(REMOTE_HOST); + String errorMessage = row.getString(ERROR_MESSAGE); + String name = row.getString(MAIL_KEY); + Date lastUpdated = row.getTimestamp(LAST_UPDATED); + Map<String, ByteBuffer> rawAttributes = row.getMap(ATTRIBUTES, String.class, ByteBuffer.class); + PerRecipientHeaders perRecipientHeaders = fromList(row.getList(PER_RECIPIENT_SPECIFIC_HEADERS, TupleValue.class)); + + MailImpl.Builder mailBuilder = MailImpl.builder() + .name(name) + .sender(sender) + .recipients(recipients) + .lastUpdated(lastUpdated) + .errorMessage(errorMessage) + .remoteHost(remoteHost) + .remoteAddr(remoteAddr) + .state(state) + .addAllHeadersForRecipients(perRecipientHeaders) + .attributes(toAttributes(rawAttributes)); + + return new MailDTO(mailBuilder, + blobIdFactory.from(row.getString(HEADER_BLOB_ID)), + blobIdFactory.from(row.getString(BODY_BLOB_ID))); + } + + private Map<String, Serializable> toAttributes(Map<String, ByteBuffer> rowAttributes) { + return rowAttributes.entrySet() + .stream() + .map(entry -> Pair.of(entry.getKey(), fromByteBuffer(entry.getValue()))) + .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight)); + } + + private ImmutableList<String> asStringList(Collection<MailAddress> mailAddresses) { + return mailAddresses.stream().map(MailAddress::asString).collect(Guavate.toImmutableList()); + } + + private ImmutableMap<String, ByteBuffer> toRawAttributeMap(Mail mail) { + return Iterators.toStream(mail.getAttributeNames()) + .map(name -> Pair.of(name, mail.getAttribute(name))) + .map(pair -> Pair.of(pair.getLeft(), toByteBuffer(pair.getRight()))) + .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight)); + } + + private ImmutableList<TupleValue> toTupleList(PerRecipientHeaders perRecipientHeaders) { + return perRecipientHeaders.getHeadersByRecipient() + .entries() + .stream() + .map(entry -> userHeaderNameHeaderValueTriple.newValue(entry.getKey().asString(), entry.getValue().getName(), entry.getValue().getValue())) + .collect(Guavate.toImmutableList()); + } + + private PerRecipientHeaders fromList(List<TupleValue> list) { + PerRecipientHeaders result = new PerRecipientHeaders(); + + list.forEach(tuple -> + result.addHeaderForRecipient( + Header.builder() + .name(tuple.getString(HEADER_NAME_INDEX)) + .value(tuple.getString(HEADER_VALUE_INDEX)) + .build(), + toMailAddress(tuple.getString(USER_INDEX)))); + return result; + } + + private ByteBuffer toByteBuffer(Serializable serializable) { + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + new ObjectOutputStream(outputStream).writeObject(serializable); + return ByteBuffer.wrap(outputStream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private Serializable fromByteBuffer(ByteBuffer byteBuffer) { + try { + byte[] data = new byte[byteBuffer.remaining()]; + byteBuffer.get(data); + ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(data)); + return (Serializable) objectInputStream.readObject(); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + private MailAddress toMailAddress(String rawValue) { + try { + return new MailAddress(rawValue); + } catch (AddressException e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/d0f973b1/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryModule.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryModule.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryModule.java index cfbcdf4..f2c44f4 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryModule.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryModule.java @@ -30,6 +30,10 @@ import static com.datastax.driver.core.schemabuilder.SchemaBuilder.frozen; import org.apache.james.backends.cassandra.components.CassandraModule; +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TupleType; + public interface CassandraMailRepositoryModule { CassandraModule MODULE = CassandraModule.builder() .type(MailRepositoryTable.HEADER_TYPE) @@ -64,5 +68,24 @@ public interface CassandraMailRepositoryModule { .addColumn(MailRepositoryTable.REMOTE_ADDR, text()) .addColumn(MailRepositoryTable.LAST_UPDATED, timestamp()) .addUDTMapColumn(MailRepositoryTable.PER_RECIPIENT_SPECIFIC_HEADERS, text(), frozen(MailRepositoryTable.HEADER_TYPE))) + .table(MailRepositoryTableV2.CONTENT_TABLE_NAME) + .comment("Stores the mails for a given repository. " + + "Content is stored with other blobs. " + + "This v2 version was introduced to support multiple headers for each user") + .statement(statement -> statement + .addPartitionKey(MailRepositoryTable.REPOSITORY_NAME, text()) + .addPartitionKey(MailRepositoryTable.MAIL_KEY, text()) + .addColumn(MailRepositoryTable.MESSAGE_SIZE, bigint()) + .addColumn(MailRepositoryTable.STATE, text()) + .addColumn(MailRepositoryTable.HEADER_BLOB_ID, text()) + .addColumn(MailRepositoryTable.BODY_BLOB_ID, text()) + .addColumn(MailRepositoryTable.ATTRIBUTES, map(text(), blob())) + .addColumn(MailRepositoryTable.ERROR_MESSAGE, text()) + .addColumn(MailRepositoryTable.SENDER, text()) + .addColumn(MailRepositoryTable.RECIPIENTS, list(text())) + .addColumn(MailRepositoryTable.REMOTE_HOST, text()) + .addColumn(MailRepositoryTable.REMOTE_ADDR, text()) + .addColumn(MailRepositoryTable.LAST_UPDATED, timestamp()) + .addColumn(MailRepositoryTable.PER_RECIPIENT_SPECIFIC_HEADERS, list(TupleType.of(ProtocolVersion.NEWEST_SUPPORTED, CodecRegistry.DEFAULT_INSTANCE, text(), text(), text())))) .build(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/d0f973b1/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MailRepositoryTableV2.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MailRepositoryTableV2.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MailRepositoryTableV2.java new file mode 100644 index 0000000..54e2e45 --- /dev/null +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/MailRepositoryTableV2.java @@ -0,0 +1,49 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailrepository.cassandra; + +public interface MailRepositoryTableV2 { + String CONTENT_TABLE_NAME = "mailRepositoryContentV2"; + + String REPOSITORY_NAME = "name"; + + String MAIL_KEY = "mailKey"; + String MESSAGE_SIZE = "messageSize"; + String HEADER_BLOB_ID = "headerBlobId"; + String BODY_BLOB_ID = "bodyBlobId"; + String STATE = "state"; + String SENDER = "sender"; + String RECIPIENTS = "recipients"; + String ATTRIBUTES = "attributes"; + String ERROR_MESSAGE = "errorMessage"; + String REMOTE_HOST = "remoteHost"; + String REMOTE_ADDR = "remoteAddr"; + String LAST_UPDATED = "lastUpdated"; + String PER_RECIPIENT_SPECIFIC_HEADERS = "perRecipientSpecificHeaders"; + + interface HeaderEntrty { + int USER_INDEX = 0; + int HEADER_NAME_INDEX = 1; + int HEADER_VALUE_INDEX = 2; + } + + String[] MAIL_PROPERTIES = { MAIL_KEY, MESSAGE_SIZE, STATE, SENDER, RECIPIENTS, ATTRIBUTES, ERROR_MESSAGE, REMOTE_ADDR, + REMOTE_HOST, LAST_UPDATED, PER_RECIPIENT_SPECIFIC_HEADERS, HEADER_BLOB_ID, BODY_BLOB_ID }; +} http://git-wip-us.apache.org/repos/asf/james-project/blob/d0f973b1/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java index 582a7ba..c02966a 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryMailDAOTest.java @@ -171,4 +171,20 @@ class CassandraMailRepositoryMailDAOTest { return testee; } } + + @Nested + class v2 extends TestSuite { + + private CassandraMailRepositoryMailDaoV2 testee; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + testee = new CassandraMailRepositoryMailDaoV2(cassandra.getConf(), BLOB_ID_FACTORY); + } + + @Override + CassandraMailRepositoryMailDaoAPI testee() { + return testee; + } + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
