Repository: james-project Updated Branches: refs/heads/master 9e66ddb13 -> b70ca1a7c
JAMES-2544 RabbitMQ browse `MailQueueView` API and cassandra table definitions Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0523acf4 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0523acf4 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0523acf4 Branch: refs/heads/master Commit: 0523acf435ff5985cc39a1e5b0454d0bd45a8285 Parents: 9e66ddb Author: duc <[email protected]> Authored: Wed Sep 12 20:12:47 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Wed Sep 26 09:21:47 2018 +0700 ---------------------------------------------------------------------- server/queue/queue-rabbitmq/pom.xml | 1 - .../james/queue/rabbitmq/MailQueueName.java | 6 +- .../queue/rabbitmq/view/api/MailQueueView.java | 36 +++++ .../CassandraMailQueueViewConfiguration.java | 91 +++++++++++ .../cassandra/CassandraMailQueueViewModule.java | 121 ++++++++++++++ .../view/cassandra/model/BucketedSlices.java | 129 +++++++++++++++ .../view/cassandra/model/EnqueuedMail.java | 156 +++++++++++++++++++ .../rabbitmq/view/cassandra/model/MailKey.java | 48 ++++++ .../cassandra/model/BucketedSlicesTest.java | 69 ++++++++ 9 files changed, 653 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/pom.xml ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/pom.xml b/server/queue/queue-rabbitmq/pom.xml index 2bc2a58..47b958b 100644 --- a/server/queue/queue-rabbitmq/pom.xml +++ b/server/queue/queue-rabbitmq/pom.xml @@ -50,7 +50,6 @@ <dependency> <groupId>${james.groupId}</groupId> <artifactId>apache-james-backends-cassandra</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>${james.groupId}</groupId> http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java index e059e3f..5c060de 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java @@ -26,7 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -final class MailQueueName { +public final class MailQueueName { static class WorkQueueName { static Optional<WorkQueueName> fromString(String name) { @@ -114,7 +114,7 @@ final class MailQueueName { private static final String EXCHANGE_PREFIX = PREFIX + "-exchange-"; @VisibleForTesting static final String WORKQUEUE_PREFIX = PREFIX + "-workqueue-"; - static MailQueueName fromString(String name) { + public static MailQueueName fromString(String name) { Preconditions.checkNotNull(name); return new MailQueueName(name); } @@ -130,7 +130,7 @@ final class MailQueueName { this.name = name; } - String asString() { + public String asString() { return name; } http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java new file mode 100644 index 0000000..0d70239 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -0,0 +1,36 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.api; + +import java.util.concurrent.CompletableFuture; + +import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.mailet.Mail; + +public interface MailQueueView { + + CompletableFuture<Void> storeMail(Mail mail); + + CompletableFuture<Void> deleteMail(Mail mail); + + ManageableMailQueue.MailQueueIterator browse(); + + long getSize(); +} http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewConfiguration.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewConfiguration.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewConfiguration.java new file mode 100644 index 0000000..b138887 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewConfiguration.java @@ -0,0 +1,91 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import java.time.Duration; + +import com.google.common.base.Preconditions; + +public class CassandraMailQueueViewConfiguration { + interface Builder { + @FunctionalInterface + interface RequireBucketCount { + RequireUpdateBrowseStartPace bucketCount(int bucketCount); + } + + @FunctionalInterface + interface RequireUpdateBrowseStartPace { + RequireSliceWindow updateBrowseStartPace(int updateBrowseStartPace); + } + + @FunctionalInterface + interface RequireSliceWindow { + ReadyToBuild sliceWindow(Duration sliceWindow); + } + + class ReadyToBuild { + private final int bucketCount; + private final int updateBrowseStartPace; + private final Duration sliceWindow; + + private ReadyToBuild(int bucketCount, int updateBrowseStartPace, Duration sliceWindow) { + this.bucketCount = bucketCount; + this.updateBrowseStartPace = updateBrowseStartPace; + this.sliceWindow = sliceWindow; + } + + public CassandraMailQueueViewConfiguration build() { + Preconditions.checkNotNull(sliceWindow, "'sliceWindow' is compulsory"); + Preconditions.checkState(bucketCount > 0, "'bucketCount' needs to be a strictly positive integer"); + Preconditions.checkState(updateBrowseStartPace > 0, "'updateBrowseStartPace' needs to be a strictly positive integer"); + + return new CassandraMailQueueViewConfiguration(bucketCount, updateBrowseStartPace, sliceWindow); + } + } + } + + public static Builder.RequireBucketCount builder() { + return bucketCount -> updateBrowseStartPace -> sliceWindow -> new Builder.ReadyToBuild(bucketCount, updateBrowseStartPace, sliceWindow); + } + + private final int bucketCount; + private final int updateBrowseStartPace; + private final Duration sliceWindow; + + private CassandraMailQueueViewConfiguration(int bucketCount, + int updateBrowseStartPace, + Duration sliceWindow) { + this.bucketCount = bucketCount; + this.updateBrowseStartPace = updateBrowseStartPace; + this.sliceWindow = sliceWindow; + } + + public int getUpdateBrowseStartPace() { + return updateBrowseStartPace; + } + + public int getBucketCount() { + return bucketCount; + } + + public Duration getSliceWindow() { + return sliceWindow; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java new file mode 100644 index 0000000..6460eb4 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewModule.java @@ -0,0 +1,121 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra; + +import static com.datastax.driver.core.DataType.blob; +import static com.datastax.driver.core.DataType.cint; +import static com.datastax.driver.core.DataType.list; +import static com.datastax.driver.core.DataType.map; +import static com.datastax.driver.core.DataType.text; +import static com.datastax.driver.core.DataType.timestamp; +import static com.datastax.driver.core.schemabuilder.SchemaBuilder.frozen; + +import org.apache.james.backends.cassandra.components.CassandraModule; + +public interface CassandraMailQueueViewModule { + + interface EnqueuedMailsTable { + String TABLE_NAME = "enqueuedMails"; + + String QUEUE_NAME = "queueName"; + String TIME_RANGE_START = "timeRangeStart"; + String BUCKET_ID = "bucketId"; + + String ENQUEUED_TIME = "enqueuedTime"; + String MAIL_KEY = "mailKey"; + String HEADER_BLOB_ID = "headerBlobId"; + String BODY_BLOB_ID = "bodyBlobId"; + String STATE = "state"; + String SENDER = "sender"; + String RECIPIENTS = "recipients"; + String ATTRIBUTES = "attributes"; + String ERROR_MESSAGE = "errorMessage"; + String REMOTE_HOST = "remoteHost"; + String REMOTE_ADDR = "remoteAddr"; + String LAST_UPDATED = "lastUpdated"; + String PER_RECIPIENT_SPECIFIC_HEADERS = "perRecipientSpecificHeaders"; + + String HEADER_TYPE = "header"; + String HEADER_NAME = "headerName"; + String HEADER_VALUE = "headerValue"; + } + + interface BrowseStartTable { + String TABLE_NAME = "browseStart"; + + String QUEUE_NAME = "queueName"; + String BROWSE_START = "browseStart"; + } + + interface DeletedMailTable { + String TABLE_NAME = "deletedMails"; + + String QUEUE_NAME = "queueName"; + String MAIL_KEY = "mailKey"; + } + + CassandraModule MODULE = CassandraModule + .type(EnqueuedMailsTable.HEADER_TYPE) + .statement(statement -> statement + .addColumn(EnqueuedMailsTable.HEADER_NAME, text()) + .addColumn(EnqueuedMailsTable.HEADER_VALUE, text())) + .table(EnqueuedMailsTable.TABLE_NAME) + .comment("store enqueued mails, if a mail is enqueued into a mail queue, it also being stored in this table," + + " when a mail is dequeued from a mail queue, the record associated with that mail still available in this" + + " table and will not be deleted immediately regarding to the performance impacts," + + " but after some scheduled tasks") + .options(options -> options) + .statement(statement -> statement + .addPartitionKey(EnqueuedMailsTable.QUEUE_NAME, text()) + .addPartitionKey(EnqueuedMailsTable.TIME_RANGE_START, timestamp()) + .addPartitionKey(EnqueuedMailsTable.BUCKET_ID, cint()) + .addClusteringColumn(EnqueuedMailsTable.MAIL_KEY, text()) + .addColumn(EnqueuedMailsTable.ENQUEUED_TIME, timestamp()) + .addColumn(EnqueuedMailsTable.STATE, text()) + .addColumn(EnqueuedMailsTable.HEADER_BLOB_ID, text()) + .addColumn(EnqueuedMailsTable.BODY_BLOB_ID, text()) + .addColumn(EnqueuedMailsTable.ATTRIBUTES, map(text(), blob())) + .addColumn(EnqueuedMailsTable.ERROR_MESSAGE, text()) + .addColumn(EnqueuedMailsTable.SENDER, text()) + .addColumn(EnqueuedMailsTable.RECIPIENTS, list(text())) + .addColumn(EnqueuedMailsTable.REMOTE_HOST, text()) + .addColumn(EnqueuedMailsTable.REMOTE_ADDR, text()) + .addColumn(EnqueuedMailsTable.LAST_UPDATED, timestamp()) + .addUDTMapColumn(EnqueuedMailsTable.PER_RECIPIENT_SPECIFIC_HEADERS, text(), frozen(EnqueuedMailsTable.HEADER_TYPE))) + + .table(BrowseStartTable.TABLE_NAME) + .comment("this table allows to find the starting point of iteration from the table: " + + EnqueuedMailsTable.TABLE_NAME + " in order to make a browse operations through mail queues") + .options(options -> options) + .statement(statement -> statement + .addPartitionKey(BrowseStartTable.QUEUE_NAME, text()) + .addColumn(BrowseStartTable.BROWSE_START, timestamp())) + + .table(DeletedMailTable.TABLE_NAME) + .comment("this table stores the dequeued mails, while browsing mail from table: " + + EnqueuedMailsTable.TABLE_NAME + " we need to filter out mails have been dequeued by checking their " + + "existence in this table") + .options(options -> options) + .statement(statement -> statement + .addPartitionKey(DeletedMailTable.QUEUE_NAME, text()) + .addPartitionKey(DeletedMailTable.MAIL_KEY, text())) + + .build(); +} http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java new file mode 100644 index 0000000..f7d5ac1 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlices.java @@ -0,0 +1,129 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra.model; + +import java.time.Instant; +import java.util.Objects; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import com.google.common.base.Preconditions; + +public class BucketedSlices { + + public static class BucketId { + + public static BucketId of(int bucketId) { + return new BucketId(bucketId); + } + + private final int value; + + private BucketId(int value) { + Preconditions.checkArgument(value >= 0, "sliceWindowSizeInSecond should not be negative"); + + this.value = value; + } + + public int getValue() { + return value; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof BucketId) { + BucketId bucketId = (BucketId) o; + + return Objects.equals(this.value, bucketId.value); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(value); + } + } + + public static class Slice { + + public static Slice of(Instant sliceStartInstant, long sliceWindowSizeInSecond) { + return new Slice(sliceStartInstant, sliceWindowSizeInSecond); + } + + public static Stream<Slice> allSlicesTill(Slice firstSlice, Instant endAt) { + long sliceCount = calculateSliceCount(firstSlice, endAt); + long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond(); + long sliceWindowSizeInSecond = firstSlice.getSliceWindowSizeInSecond(); + + return LongStream.range(0, sliceCount) + .map(slicePosition -> startAtSeconds + sliceWindowSizeInSecond * slicePosition) + .mapToObj(Instant::ofEpochSecond) + .map(sliceStartInstant -> Slice.of(sliceStartInstant, firstSlice.getSliceWindowSizeInSecond())); + } + + private static long calculateSliceCount(Slice firstSlice, Instant endAt) { + long startAtSeconds = firstSlice.getStartSliceInstant().getEpochSecond(); + long endAtSeconds = endAt.getEpochSecond(); + long timeDiffInSecond = endAtSeconds - startAtSeconds; + + if (timeDiffInSecond < 0) { + return 0; + } else { + return (timeDiffInSecond / firstSlice.sliceWindowSizeInSecond) + 1; + } + } + + private final Instant startSliceInstant; + private final long sliceWindowSizeInSecond; + + private Slice(Instant startSliceInstant, long sliceWindowSizeInSecond) { + Preconditions.checkNotNull(startSliceInstant); + Preconditions.checkArgument(sliceWindowSizeInSecond > 0, "sliceWindowSizeInSecond should be positive"); + + this.startSliceInstant = startSliceInstant; + this.sliceWindowSizeInSecond = sliceWindowSizeInSecond; + } + + public Instant getStartSliceInstant() { + return startSliceInstant; + } + + public long getSliceWindowSizeInSecond() { + return sliceWindowSizeInSecond; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof Slice) { + Slice slice = (Slice) o; + + return Objects.equals(this.sliceWindowSizeInSecond, slice.sliceWindowSizeInSecond) + && Objects.equals(this.startSliceInstant, slice.startSliceInstant); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(startSliceInstant, sliceWindowSizeInSecond); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java new file mode 100644 index 0000000..6e022b3 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedMail.java @@ -0,0 +1,156 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra.model; + +import java.time.Instant; +import java.util.Comparator; +import java.util.Objects; + +import org.apache.james.queue.rabbitmq.MailQueueName; +import org.apache.mailet.Mail; + +public class EnqueuedMail { + + public interface Builder { + + @FunctionalInterface + interface RequireMail { + RequireBucketId mail(Mail mail); + } + + @FunctionalInterface + interface RequireBucketId { + RequireTimeRangeStart bucketId(BucketedSlices.BucketId bucketId); + } + + @FunctionalInterface + interface RequireTimeRangeStart { + RequireEnqueuedTime timeRangeStart(Instant timeRangeStart); + } + + @FunctionalInterface + interface RequireEnqueuedTime { + RequireMailKey enqueuedTime(Instant enqueuedTime); + } + + @FunctionalInterface + interface RequireMailKey { + RequireMailQueueName mailKey(MailKey mailKey); + } + + @FunctionalInterface + interface RequireMailQueueName { + LastStage mailQueueName(MailQueueName mailQueueName); + } + + class LastStage { + private Mail mail; + private BucketedSlices.BucketId bucketId; + private Instant timeRangeStart; + private Instant enqueuedTime; + private MailKey mailKey; + private MailQueueName mailQueueName; + + private LastStage(Mail mail, BucketedSlices.BucketId bucketId, + Instant timeRangeStart, Instant enqueuedTime, + MailKey mailKey, MailQueueName mailQueueName) { + this.mail = mail; + this.bucketId = bucketId; + this.timeRangeStart = timeRangeStart; + this.enqueuedTime = enqueuedTime; + this.mailKey = mailKey; + this.mailQueueName = mailQueueName; + } + + public EnqueuedMail build() { + return new EnqueuedMail(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName); + } + } + } + + public static Builder.RequireMail builder() { + return mail -> bucketId -> timeRangeStart -> enqueuedTime -> mailKey -> mailQueueName -> + new Builder.LastStage(mail, bucketId, timeRangeStart, enqueuedTime, mailKey, mailQueueName); + } + + public static Comparator<EnqueuedMail> getEnqueuedTimeComparator() { + return Comparator.comparing(EnqueuedMail::getEnqueuedTime); + } + + private final Mail mail; + private final BucketedSlices.BucketId bucketId; + private final Instant timeRangeStart; + private final Instant enqueuedTime; + private final MailKey mailKey; + private final MailQueueName mailQueueName; + + private EnqueuedMail(Mail mail, BucketedSlices.BucketId bucketId, Instant timeRangeStart, + Instant enqueuedTime, MailKey mailKey, MailQueueName mailQueueName) { + this.mail = mail; + this.bucketId = bucketId; + this.timeRangeStart = timeRangeStart; + this.enqueuedTime = enqueuedTime; + this.mailKey = mailKey; + this.mailQueueName = mailQueueName; + } + + public Mail getMail() { + return mail; + } + + public BucketedSlices.BucketId getBucketId() { + return bucketId; + } + + public MailKey getMailKey() { + return mailKey; + } + + public MailQueueName getMailQueueName() { + return mailQueueName; + } + + public Instant getTimeRangeStart() { + return timeRangeStart; + } + + public Instant getEnqueuedTime() { + return enqueuedTime; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof EnqueuedMail) { + EnqueuedMail that = (EnqueuedMail) o; + + return Objects.equals(this.bucketId, that.bucketId) + && Objects.equals(this.mail, that.mail) + && Objects.equals(this.timeRangeStart, that.timeRangeStart) + && Objects.equals(this.mailKey, that.mailKey) + && Objects.equals(this.mailQueueName, that.mailQueueName); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(mail, bucketId, timeRangeStart, mailKey, mailQueueName); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java new file mode 100644 index 0000000..c39dc16 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/MailKey.java @@ -0,0 +1,48 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra.model; + +import org.apache.mailet.Mail; + +import com.google.common.base.Preconditions; + +public class MailKey { + + public static MailKey fromMail(Mail mail) { + return of(mail.getName()); + } + + public static MailKey of(String mailKey) { + return new MailKey(mailKey); + } + + private final String mailKey; + + private MailKey(String mailKey) { + Preconditions.checkNotNull(mailKey); + Preconditions.checkArgument(!mailKey.isEmpty()); + + this.mailKey = mailKey; + } + + public String getMailKey() { + return mailKey; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/0523acf4/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java new file mode 100644 index 0000000..110d870 --- /dev/null +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/model/BucketedSlicesTest.java @@ -0,0 +1,69 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.queue.rabbitmq.view.cassandra.model; + +import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice; +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class BucketedSlicesTest { + + private static final long ONE_HOUR_IN_SECONDS = 3600; + + private static final Instant FIRST_SLICE_INSTANT = Instant.parse("2018-05-20T12:00:00.000Z"); + private static final Instant FIRST_SLICE_INSTANT_NEXT_HOUR = FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS); + private static final Instant FIRST_SLICE_INSTANT_NEXT_TWO_HOUR = FIRST_SLICE_INSTANT.plusSeconds(ONE_HOUR_IN_SECONDS * 2); + + private static final Slice FIRST_SLICE = Slice.of(FIRST_SLICE_INSTANT, ONE_HOUR_IN_SECONDS); + private static final Slice FIRST_SLICE_NEXT_TWO_HOUR = Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_IN_SECONDS); + + @Nested + class Validation { + } + + @Test + void allSlicesTillShouldReturnOnlyFirstSliceWhenEndAtInTheSameInterval() { + assertThat(Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT.plusSeconds(3599))) + .containsOnly(FIRST_SLICE); + } + + @Test + void allSlicesTillShouldReturnAllSlicesBetweenStartAndEndAt() { + Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE, FIRST_SLICE_INSTANT_NEXT_TWO_HOUR.plusSeconds(3599)); + + assertThat(allSlices) + .containsExactly( + FIRST_SLICE, + Slice.of(FIRST_SLICE_INSTANT_NEXT_HOUR, ONE_HOUR_IN_SECONDS), + Slice.of(FIRST_SLICE_INSTANT_NEXT_TWO_HOUR, ONE_HOUR_IN_SECONDS)); + } + + @Test + void allSlicesTillShouldReturnEmptyIfEndAtBeforeStartSlice() { + Stream<Slice> allSlices = Slice.allSlicesTill(FIRST_SLICE_NEXT_TWO_HOUR, FIRST_SLICE_INSTANT); + + assertThat(allSlices).isEmpty(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
