This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1788610662dc936d037399ea6f03a3ad48658168 Author: Benoit Tellier <[email protected]> AuthorDate: Sun Jul 28 16:38:45 2019 +0200 JAMES-2850 FIX RabbitMQ MailQueue with multiple specific headers for a single recipient --- .../apache/james/queue/api/MailQueueContract.java | 22 ++++++++++++ .../cassandra/CassandraMailQueueViewModule.java | 23 ++++++------ .../rabbitmq/view/cassandra/EnqueuedMailsDAO.java | 14 ++++---- .../view/cassandra/EnqueuedMailsDaoUtil.java | 42 ++++++++++------------ .../RabbitMQMailQueueConfigurationChangeTest.java | 1 - .../queue/rabbitmq/RabbitMQMailQueueTest.java | 2 +- .../CassandraMailQueueViewTestFactory.java | 4 +-- .../view/cassandra/EnqueuedMailsDaoTest.java | 2 +- 8 files changed, 63 insertions(+), 47 deletions(-) diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java index a6492cc..429f067 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java @@ -281,6 +281,28 @@ public interface MailQueueContract { } @Test + default void queueShouldPreserveMultiplePerRecipientHeaders() throws Exception { + PerRecipientHeaders.Header header = PerRecipientHeaders.Header.builder() + .name("any") + .value("any") + .build(); + PerRecipientHeaders.Header header2 = PerRecipientHeaders.Header.builder() + .name("any2") + .value("any") + .build(); + enQueue(defaultMail() + .name("mail") + .addHeaderForRecipient(header, RECIPIENT1) + .addHeaderForRecipient(header2, RECIPIENT1) + .build()); + + MailQueue.MailQueueItem mailQueueItem = Flux.from(getMailQueue().deQueue()).blockFirst(); + assertThat(mailQueueItem.getMail().getPerRecipientSpecificHeaders() + .getHeadersForRecipient(RECIPIENT1)) + .containsOnly(header, header2); + } + + @Test default void queueShouldPreserveNonStringMailAttribute() throws Exception { Attribute attribute = Attribute.convertToAttribute("any", new SerializableAttribute("value")); enQueue(defaultMail() diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java index a160b9b..6f109fd 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java @@ -26,14 +26,17 @@ import static com.datastax.driver.core.DataType.map; import static com.datastax.driver.core.DataType.text; import static com.datastax.driver.core.DataType.timestamp; import static com.datastax.driver.core.DataType.uuid; -import static com.datastax.driver.core.schemabuilder.SchemaBuilder.frozen; import org.apache.james.backends.cassandra.components.CassandraModule; +import com.datastax.driver.core.CodecRegistry; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.TupleType; + public interface CassandraMailQueueViewModule { interface EnqueuedMailsTable { - String TABLE_NAME = "enqueuedMailsV2"; + String TABLE_NAME = "enqueuedMailsV3"; String QUEUE_NAME = "queueName"; String TIME_RANGE_START = "timeRangeStart"; @@ -53,10 +56,6 @@ public interface CassandraMailQueueViewModule { String REMOTE_ADDR = "remoteAddr"; String LAST_UPDATED = "lastUpdated"; String PER_RECIPIENT_SPECIFIC_HEADERS = "perRecipientSpecificHeaders"; - - String HEADER_TYPE = "enqueuedMailHeaders"; - String HEADER_NAME = "headerName"; - String HEADER_VALUE = "headerValue"; } interface BrowseStartTable { @@ -73,11 +72,13 @@ public interface CassandraMailQueueViewModule { String ENQUEUE_ID = "enqueueId"; } + interface HeaderEntry { + int USER_INDEX = 0; + int HEADER_NAME_INDEX = 1; + int HEADER_VALUE_INDEX = 2; + } + CassandraModule MODULE = CassandraModule - .type(EnqueuedMailsTable.HEADER_TYPE) - .statement(statement -> statement - .addColumn(EnqueuedMailsTable.HEADER_NAME, text()) - .addColumn(EnqueuedMailsTable.HEADER_VALUE, text())) .table(EnqueuedMailsTable.TABLE_NAME) .comment("store enqueued mails, if a mail is enqueued into a mail queue, it also being stored in this table," + " when a mail is dequeued from a mail queue, the record associated with that mail still available in this" + @@ -101,7 +102,7 @@ public interface CassandraMailQueueViewModule { .addColumn(EnqueuedMailsTable.REMOTE_HOST, text()) .addColumn(EnqueuedMailsTable.REMOTE_ADDR, text()) .addColumn(EnqueuedMailsTable.LAST_UPDATED, timestamp()) - .addUDTMapColumn(EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS, text(), frozen(EnqueuedMailsTable.HEADER_TYPE))) + .addColumn(EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS, list(TupleType.of(ProtocolVersion.NEWEST_SUPPORTED, CodecRegistry.DEFAULT_INSTANCE, text(), text(), text())))) .table(BrowseStartTable.TABLE_NAME) .comment("this table allows to find the starting point of iteration from the table: " diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java index 55f0e43..e86c4f3 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDAO.java @@ -19,6 +19,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra; +import static com.datastax.driver.core.DataType.text; import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; @@ -42,14 +43,13 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TABLE_NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START; import static org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.asStringList; -import static org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.toHeaderMap; import static org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.toRawAttributeMap; +import static org.apache.james.queue.rabbitmq.view.cassandra.EnqueuedMailsDaoUtil.toTupleList; import java.util.Date; import javax.inject.Inject; -import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.mail.MimeMessagePartsId; @@ -62,6 +62,7 @@ import org.apache.mailet.Mail; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; +import com.datastax.driver.core.TupleType; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -71,18 +72,17 @@ public class EnqueuedMailsDAO { private final CassandraAsyncExecutor executor; private final PreparedStatement selectFrom; private final PreparedStatement insert; - private final CassandraTypesProvider cassandraTypesProvider; private final BlobId.Factory blobFactory; + private final TupleType userHeaderNameHeaderValueTriple; @Inject - EnqueuedMailsDAO(Session session, CassandraTypesProvider cassandraTypesProvider, - BlobId.Factory blobIdFactory) { + EnqueuedMailsDAO(Session session, BlobId.Factory blobIdFactory) { this.executor = new CassandraAsyncExecutor(session); - this.cassandraTypesProvider = cassandraTypesProvider; this.selectFrom = prepareSelectFrom(session); this.insert = prepareInsert(session); this.blobFactory = blobIdFactory; + this.userHeaderNameHeaderValueTriple = session.getCluster().getMetadata().newTupleType(text(), text(), text()); } private PreparedStatement prepareSelectFrom(Session session) { @@ -137,7 +137,7 @@ public class EnqueuedMailsDAO { .setString(REMOTE_HOST, mail.getRemoteHost()) .setTimestamp(LAST_UPDATED, mail.getLastUpdated()) .setMap(ATTRIBUTES, toRawAttributeMap(mail)) - .setMap(PER_RECIPIENT_SPECIFIC_HEADERS, toHeaderMap(cassandraTypesProvider, mail.getPerRecipientSpecificHeaders()))); + .setList(PER_RECIPIENT_SPECIFIC_HEADERS, toTupleList(userHeaderNameHeaderValueTriple, mail.getPerRecipientSpecificHeaders()))); } Flux<EnqueuedItemWithSlicingContext> selectEnqueuedMails( diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java index 04c6cec..7d16d15 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoUtil.java @@ -26,9 +26,6 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ENQUEUE_ID; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.ERROR_MESSAGE; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_BLOB_ID; -import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_NAME; -import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_TYPE; -import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.HEADER_VALUE; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.LAST_UPDATED; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.NAME; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS; @@ -39,6 +36,9 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueV import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.SENDER; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.STATE; import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.EnqueuedMailsTable.TIME_RANGE_START; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.HeaderEntry.HEADER_NAME_INDEX; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.HeaderEntry.HEADER_VALUE_INDEX; +import static org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule.HeaderEntry.USER_INDEX; import java.io.IOException; import java.io.UncheckedIOException; @@ -54,7 +54,6 @@ import java.util.Optional; import javax.mail.internet.AddressException; import org.apache.commons.lang3.tuple.Pair; -import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.core.MailAddress; @@ -71,8 +70,10 @@ import org.apache.mailet.Mail; import org.apache.mailet.PerRecipientHeaders; import com.datastax.driver.core.Row; -import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.TupleType; +import com.datastax.driver.core.TupleValue; import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -107,7 +108,7 @@ public class EnqueuedMailsDaoUtil { String name = row.getString(NAME); Date lastUpdated = row.getTimestamp(LAST_UPDATED); Map<String, ByteBuffer> rawAttributes = row.getMap(ATTRIBUTES, String.class, ByteBuffer.class); - PerRecipientHeaders perRecipientHeaders = fromHeaderMap(row.getMap(PER_RECIPIENT_SPECIFIC_HEADERS, String.class, UDTValue.class)); + PerRecipientHeaders perRecipientHeaders = fromList(row.getList(PER_RECIPIENT_SPECIFIC_HEADERS, TupleValue.class)); MailImpl mail = MailImpl.builder() .name(name) @@ -152,14 +153,16 @@ public class EnqueuedMailsDaoUtil { } } - private static PerRecipientHeaders fromHeaderMap(Map<String, UDTValue> rawMap) { + private static PerRecipientHeaders fromList(List<TupleValue> list) { PerRecipientHeaders result = new PerRecipientHeaders(); - rawMap.forEach((key, value) -> result.addHeaderForRecipient(PerRecipientHeaders.Header.builder() - .name(value.getString(HEADER_NAME)) - .value(value.getString(HEADER_VALUE)) - .build(), - toMailAddress(key))); + list.forEach(tuple -> + result.addHeaderForRecipient( + PerRecipientHeaders.Header.builder() + .name(tuple.getString(HEADER_NAME_INDEX)) + .value(tuple.getString(HEADER_VALUE_INDEX)) + .build(), + toMailAddress(tuple.getString(USER_INDEX)))); return result; } @@ -187,18 +190,11 @@ public class EnqueuedMailsDaoUtil { return ByteBuffer.wrap(attributeValue.toJson().toString().getBytes(StandardCharsets.UTF_8)); } - static ImmutableMap<String, UDTValue> toHeaderMap(CassandraTypesProvider cassandraTypesProvider, - PerRecipientHeaders perRecipientHeaders) { + static ImmutableList<TupleValue> toTupleList(TupleType userHeaderNameHeaderValueTriple, PerRecipientHeaders perRecipientHeaders) { return perRecipientHeaders.getHeadersByRecipient() - .asMap() - .entrySet() + .entries() .stream() - .flatMap(entry -> entry.getValue().stream().map(value -> Pair.of(entry.getKey(), value))) - .map(entry -> Pair.of(entry.getKey().asString(), - cassandraTypesProvider.getDefinedUserType(HEADER_TYPE) - .newValue() - .setString(HEADER_NAME, entry.getRight().getName()) - .setString(HEADER_VALUE, entry.getRight().getValue()))) - .collect(ImmutableMap.toImmutableMap(Pair::getLeft, Pair::getRight)); + .map(entry -> userHeaderNameHeaderValueTriple.newValue(entry.getKey().asString(), entry.getValue().getName(), entry.getValue().getValue())) + .collect(Guavate.toImmutableList()); } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java index 88f9eae..2e95d87 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java @@ -107,7 +107,6 @@ class RabbitMQMailQueueConfigurationChangeTest { private RabbitMQMailQueue getRabbitMQMailQueue(CassandraCluster cassandra, CassandraMailQueueViewConfiguration mailQueueViewConfiguration) throws Exception { CassandraMailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), - cassandra.getTypesProvider(), mailQueueViewConfiguration, mimeMessageStoreFactory); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 5f13186..6e9abdf 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -104,7 +104,7 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore); clock = new UpdatableTickingClock(IN_SLICE_1); - MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), cassandra.getTypesProvider(), + MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(), CassandraMailQueueViewConfiguration.builder() .bucketCount(THREE_BUCKET_COUNT) .updateBrowseStartPace(UPDATE_BROWSE_START_PACE) diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java index 9240be2..4181be1 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java @@ -22,7 +22,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import java.time.Clock; import java.util.Optional; -import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore; @@ -41,12 +40,11 @@ import reactor.core.publisher.Mono; public class CassandraMailQueueViewTestFactory { public static CassandraMailQueueView.Factory factory(Clock clock, Session session, - CassandraTypesProvider typesProvider, CassandraMailQueueViewConfiguration configuration, MimeMessageStore.Factory mimeMessageStoreFactory) { HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); - EnqueuedMailsDAO enqueuedMailsDao = new EnqueuedMailsDAO(session, typesProvider, blobIdFactory); + EnqueuedMailsDAO enqueuedMailsDao = new EnqueuedMailsDAO(session, blobIdFactory); BrowseStartDAO browseStartDao = new BrowseStartDAO(session); DeletedMailsDAO deletedMailsDao = new DeletedMailsDAO(session); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java index 21d8820..9affadf 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/EnqueuedMailsDaoTest.java @@ -73,7 +73,7 @@ class EnqueuedMailsDaoTest { BlobId.Factory blobFactory = new HashBlobId.Factory(); testee = new EnqueuedMailsDAO( cassandra.getConf(), - cassandra.getTypesProvider(), blobFactory); + blobFactory); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
