Repository: james-project Updated Branches: refs/heads/master a5f3aa719 -> 33419f67a
JAMES-2083 Don't create migration threads when migration not expected Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/33419f67 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/33419f67 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/33419f67 Branch: refs/heads/master Commit: 33419f67a9ca96637c7ae94906981a8c38815665 Parents: a5f3aa7 Author: Antoine Duprat <[email protected]> Authored: Wed Jul 12 11:00:56 2017 +0200 Committer: Antoine Duprat <[email protected]> Committed: Thu Jul 13 09:42:36 2017 +0200 ---------------------------------------------------------------------- .../cassandra/mail/migration/V1ToV2Migration.java | 16 +++++++++++----- .../mail/migration/V1ToV2MigrationThread.java | 17 +---------------- 2 files changed, 12 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/33419f67/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java index 7522b03..e794a8e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java @@ -39,11 +39,11 @@ import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation; import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment; import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.store.mail.MessageMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class V1ToV2Migration { @@ -64,9 +64,15 @@ public class V1ToV2Migration { this.migrationExecutor = Executors.newFixedThreadPool(cassandraConfiguration.getV1ToV2ThreadCount()); boolean ensureFifoOrder = false; this.messagesToBeMigrated = new ArrayBlockingQueue<>(cassandraConfiguration.getV1ToV2QueueLength(), ensureFifoOrder); - IntStream.range(0, cassandraConfiguration.getV1ToV2ThreadCount()) - .mapToObj(i -> new V1ToV2MigrationThread(messagesToBeMigrated, messageDAOV1, messageDAOV2, attachmentLoader)) - .forEach(migrationExecutor::execute); + executeMigrationThread(messageDAOV1, messageDAOV2, cassandraConfiguration); + } + + private void executeMigrationThread(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, CassandraConfiguration cassandraConfiguration) { + if (cassandraConfiguration.isOnTheFlyV1ToV2Migration()) { + IntStream.range(0, cassandraConfiguration.getV1ToV2ThreadCount()) + .mapToObj(i -> new V1ToV2MigrationThread(messagesToBeMigrated, messageDAOV1, messageDAOV2, attachmentLoader)) + .forEach(migrationExecutor::execute); + } } @PreDestroy http://git-wip-us.apache.org/repos/asf/james-project/blob/33419f67/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java index 26c96d6..1b96179 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java @@ -58,7 +58,7 @@ public class V1ToV2MigrationThread implements Runnable { public void run() { while (true) { try { - Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message = dequeue(); + Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message = messagesToBeMigrated.take(); performV1ToV2Migration(message).join(); } catch (Exception e) { LOGGER.error("Error occured in migration thread", e); @@ -66,21 +66,6 @@ public class V1ToV2MigrationThread implements Runnable { } } - private Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> dequeue() { - while (true) { - Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> poll = poll(); - if (poll.isPresent()) { - return poll.get(); - } - } - } - - private Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> poll() { - synchronized (messagesToBeMigrated) { - return Optional.ofNullable(messagesToBeMigrated.poll()); - } - } - private CompletableFuture<Void> performV1ToV2Migration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) { return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), MessageMapper.FetchType.Full) .thenApply(stream -> stream.findAny().get()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
