JAMES-2272 Memory implementation for TaskManager
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8c1a0671 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8c1a0671 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8c1a0671 Branch: refs/heads/master Commit: 8c1a06717a4fc694648a496e1cb0e2981a339ced Parents: fbce4b0 Author: benwa <[email protected]> Authored: Wed Dec 27 11:47:04 2017 +0700 Committer: benwa <[email protected]> Committed: Thu Jan 4 15:03:36 2018 +0700 ---------------------------------------------------------------------- .../apache/james/task/MemoryTaskManager.java | 156 +++++++ .../apache/james/task/TaskExecutionDetails.java | 28 +- .../james/task/MemoryTaskManagerTest.java | 434 +++++++++++++++++++ 3 files changed, 600 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java ---------------------------------------------------------------------- diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java new file mode 100644 index 0000000..ddd37bb --- /dev/null +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -0,0 +1,156 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Consumer; + +import javax.annotation.PreDestroy; + +import org.apache.james.util.MDCBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; +import com.github.steveash.guavate.Guavate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; + +public class MemoryTaskManager implements TaskManager { + private static final boolean INTERRUPT_IF_RUNNING = true; + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManager.class); + + private final ConcurrentHashMap<TaskId, TaskExecutionDetails> idToExecutionDetails; + private final ConcurrentHashMap<TaskId, Future> idToFuture; + private final ExecutorService executor; + + public MemoryTaskManager() { + idToExecutionDetails = new ConcurrentHashMap<>(); + idToFuture = new ConcurrentHashMap<>(); + executor = Executors.newSingleThreadExecutor(); + } + + @Override + public TaskId submit(Task task) { + return submit(task, id -> {}); + } + + @VisibleForTesting + TaskId submit(Task task, Consumer<TaskId> callback) { + TaskId taskId = TaskId.generateTaskId(); + TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId); + + idToExecutionDetails.put(taskId, executionDetails); + idToFuture.put(taskId, + executor.submit(() -> runWithMdc(executionDetails, task, callback))); + return taskId; + } + + private void runWithMdc(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) { + MDCBuilder.withMdc( + MDCBuilder.create() + .addContext(Task.TASK_ID, executionDetails.getTaskId()) + .addContext(Task.TASK_TYPE, executionDetails.getType()) + .addContext(Task.TASK_DETAILS, executionDetails.getAdditionalInformation()), + () -> run(executionDetails, task, callback)); + } + + private void run(TaskExecutionDetails executionDetails, Task task, Consumer<TaskId> callback) { + TaskExecutionDetails started = executionDetails.start(); + idToExecutionDetails.put(started.getTaskId(), started); + try { + task.run() + .onComplete(() -> success(started)) + .onFailure(() -> failed(started, + logger -> logger.info("Task was partially performed. Check logs for more details"))); + } catch (Exception e) { + failed(started, + logger -> logger.error("Error while running task", executionDetails, e)); + } finally { + idToFuture.remove(executionDetails.getTaskId()); + callback.accept(executionDetails.getTaskId()); + } + } + + private void success(TaskExecutionDetails started) { + if (!wasCancelled(started.getTaskId())) { + idToExecutionDetails.put(started.getTaskId(), started.completed()); + LOGGER.info("Task success"); + } + } + + private void failed(TaskExecutionDetails started, Consumer<Logger> logOperation) { + if (!wasCancelled(started.getTaskId())) { + idToExecutionDetails.put(started.getTaskId(), started.failed()); + logOperation.accept(LOGGER); + } + } + + private boolean wasCancelled(TaskId taskId) { + return idToExecutionDetails.get(taskId).getStatus() == Status.CANCELLED; + } + + @Override + public TaskExecutionDetails getExecutionDetails(TaskId id) { + return Optional.ofNullable(idToExecutionDetails.get(id)) + .orElseThrow(TaskNotFoundException::new); + } + + @Override + public List<TaskExecutionDetails> list() { + return ImmutableList.copyOf(idToExecutionDetails.values()); + } + + @Override + public List<TaskExecutionDetails> list(Status status) { + return idToExecutionDetails.values() + .stream() + .filter(details -> details.getStatus().equals(status)) + .collect(Guavate.toImmutableList()); + } + + @Override + public void cancel(TaskId id) { + Optional.ofNullable(idToFuture.get(id)) + .ifPresent(future -> { + TaskExecutionDetails executionDetails = idToExecutionDetails.get(id); + idToExecutionDetails.put(id, executionDetails.cancel()); + future.cancel(INTERRUPT_IF_RUNNING); + idToFuture.remove(id); + }); + } + + @Override + public TaskExecutionDetails await(TaskId id) { + Optional.ofNullable(idToFuture.get(id)) + .ifPresent(Throwing.consumer(Future::get)); + return getExecutionDetails(id); + } + + @PreDestroy + public void stop() { + executor.shutdownNow(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java ---------------------------------------------------------------------- diff --git a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java index 0122fa7..a95627c 100644 --- a/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java +++ b/server/task/src/main/java/org/apache/james/task/TaskExecutionDetails.java @@ -33,9 +33,8 @@ public class TaskExecutionDetails { public static TaskExecutionDetails from(Task task, TaskId id) { return new TaskExecutionDetails( id, - task.type(), + task, TaskManager.Status.WAITING, - task.details(), Optional.of(ZonedDateTime.now()), Optional.empty(), Optional.empty(), @@ -44,24 +43,21 @@ public class TaskExecutionDetails { } private final TaskId taskId; - private final String type; + private final Task task; private final TaskManager.Status status; - private final Optional<AdditionalInformation> additionalInformation; private final Optional<ZonedDateTime> submitDate; private final Optional<ZonedDateTime> startedDate; private final Optional<ZonedDateTime> completedDate; private final Optional<ZonedDateTime> canceledDate; private final Optional<ZonedDateTime> failedDate; - public TaskExecutionDetails(TaskId taskId, String type, TaskManager.Status status, - Optional<AdditionalInformation> additionalInformation, + public TaskExecutionDetails(TaskId taskId, Task task, TaskManager.Status status, Optional<ZonedDateTime> submitDate, Optional<ZonedDateTime> startedDate, Optional<ZonedDateTime> completedDate, Optional<ZonedDateTime> canceledDate, Optional<ZonedDateTime> failedDate) { this.taskId = taskId; - this.type = type; + this.task = task; this.status = status; - this.additionalInformation = additionalInformation; this.submitDate = submitDate; this.startedDate = startedDate; this.completedDate = completedDate; @@ -74,7 +70,7 @@ public class TaskExecutionDetails { } public String getType() { - return type; + return task.type(); } public TaskManager.Status getStatus() { @@ -82,7 +78,7 @@ public class TaskExecutionDetails { } public Optional<AdditionalInformation> getAdditionalInformation() { - return additionalInformation; + return task.details(); } public Optional<ZonedDateTime> getSubmitDate() { @@ -109,9 +105,8 @@ public class TaskExecutionDetails { Preconditions.checkState(status == TaskManager.Status.WAITING); return new TaskExecutionDetails( taskId, - type, + task, TaskManager.Status.IN_PROGRESS, - additionalInformation, submitDate, Optional.of(ZonedDateTime.now()), Optional.empty(), @@ -123,9 +118,8 @@ public class TaskExecutionDetails { Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS); return new TaskExecutionDetails( taskId, - type, + task, TaskManager.Status.COMPLETED, - additionalInformation, submitDate, startedDate, Optional.of(ZonedDateTime.now()), @@ -137,9 +131,8 @@ public class TaskExecutionDetails { Preconditions.checkState(status == TaskManager.Status.IN_PROGRESS); return new TaskExecutionDetails( taskId, - type, + task, TaskManager.Status.FAILED, - additionalInformation, submitDate, startedDate, Optional.empty(), @@ -152,9 +145,8 @@ public class TaskExecutionDetails { || status == TaskManager.Status.WAITING); return new TaskExecutionDetails( taskId, - type, + task, TaskManager.Status.CANCELLED, - additionalInformation, submitDate, startedDate, Optional.empty(), http://git-wip-us.apache.org/repos/asf/james-project/blob/8c1a0671/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java ---------------------------------------------------------------------- diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java new file mode 100644 index 0000000..0b7005a --- /dev/null +++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java @@ -0,0 +1,434 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.JUnitSoftAssertions; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.github.fge.lambdas.Throwing; +import com.github.fge.lambdas.consumers.ConsumerChainer; +import com.google.common.base.Throwables; + +public class MemoryTaskManagerTest { + + private MemoryTaskManager memoryTaskManager; + + @Rule + public JUnitSoftAssertions softly = new JUnitSoftAssertions(); + + @Before + public void setUp() { + memoryTaskManager = new MemoryTaskManager(); + } + + @After + public void tearDown() { + memoryTaskManager.stop(); + } + + @Test + public void getStatusShouldReturnUnknownWhenUnknownId() { + TaskId unknownId = TaskId.generateTaskId(); + assertThatThrownBy(() -> memoryTaskManager.getExecutionDetails(unknownId)) + .isInstanceOf(TaskNotFoundException.class); + } + + @Test + public void getStatusShouldReturnWaitingWhenNotYetProcessed() { + CountDownLatch task1Latch = new CountDownLatch(1); + + memoryTaskManager.submit(() -> { + await(task1Latch); + return Task.Result.COMPLETED; + }); + + TaskId taskId = memoryTaskManager.submit(() -> Task.Result.COMPLETED); + + assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus()) + .isEqualTo(TaskManager.Status.WAITING); + } + + @Test + public void taskCodeAfterCancelIsNotRun() { + CountDownLatch task1Latch = new CountDownLatch(1); + AtomicInteger count = new AtomicInteger(0); + + TaskId id = memoryTaskManager.submit(() -> { + await(task1Latch); + count.incrementAndGet(); + return Task.Result.COMPLETED; + }); + + memoryTaskManager.cancel(id); + task1Latch.countDown(); + + assertThat(count.get()).isEqualTo(0); + } + + @Test + public void getStatusShouldReturnCancelledWhenCancelled() throws Exception { + CountDownLatch task1Latch = new CountDownLatch(1); + CountDownLatch ensureStartedLatch = new CountDownLatch(1); + CountDownLatch ensureFinishedLatch = new CountDownLatch(1); + + TaskId id = memoryTaskManager.submit(() -> { + ensureStartedLatch.countDown(); + await(task1Latch); + return Task.Result.COMPLETED; + }, + any -> ensureFinishedLatch.countDown()); + + ensureStartedLatch.await(); + memoryTaskManager.cancel(id); + ensureFinishedLatch.await(); + + assertThat(memoryTaskManager.getExecutionDetails(id).getStatus()) + .isEqualTo(TaskManager.Status.CANCELLED); + } + + @Test + public void cancelShouldBeIdempotent() { + CountDownLatch task1Latch = new CountDownLatch(1); + + TaskId id = memoryTaskManager.submit(() -> { + await(task1Latch); + return Task.Result.COMPLETED; + }); + + memoryTaskManager.cancel(id); + assertThatCode(() -> memoryTaskManager.cancel(id)) + .doesNotThrowAnyException(); + } + + @Test + public void getStatusShouldReturnInProgressWhenProcessingIsInProgress() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + + TaskId taskId = memoryTaskManager.submit(() -> { + latch2.countDown(); + await(latch1); + return Task.Result.COMPLETED; + }); + latch2.await(); + + assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus()) + .isEqualTo(TaskManager.Status.IN_PROGRESS); + } + + @Test + public void getStatusShouldReturnCompletedWhenRunSuccessfully() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + TaskId taskId = memoryTaskManager.submit( + () -> Task.Result.COMPLETED, + countDownCallback(latch)); + + latch.await(); + + assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus()) + .isEqualTo(TaskManager.Status.COMPLETED); + } + + @Test + public void getStatusShouldReturnFailedWhenRunPartially() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + + TaskId taskId = memoryTaskManager.submit( + () -> Task.Result.PARTIAL, + countDownCallback(latch)); + + latch.await(); + + assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus()) + .isEqualTo(TaskManager.Status.FAILED); + } + + private ConsumerChainer<TaskId> countDownCallback(CountDownLatch latch) { + return Throwing.consumer(id -> latch.countDown()); + } + + @Test + public void listShouldReturnTaskSatus() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch latch3 = new CountDownLatch(1); + + TaskId failedId = memoryTaskManager.submit( + () -> Task.Result.PARTIAL); + TaskId successfulId = memoryTaskManager.submit( + () -> Task.Result.COMPLETED); + TaskId inProgressId = memoryTaskManager.submit( + () -> { + await(latch1); + latch2.countDown(); + await(latch3); + return Task.Result.COMPLETED; + }); + TaskId waitingId = memoryTaskManager.submit( + () -> { + await(latch3); + latch2.countDown(); + return Task.Result.COMPLETED; + }); + + latch1.countDown(); + latch2.await(); + + List<TaskExecutionDetails> list = memoryTaskManager.list(); + softly.assertThat(list).hasSize(4); + softly.assertThat(entryWithId(list, failedId)) + .isEqualTo(TaskManager.Status.FAILED); + softly.assertThat(entryWithId(list, waitingId)) + .isEqualTo(TaskManager.Status.WAITING); + softly.assertThat(entryWithId(list, successfulId)) + .isEqualTo(TaskManager.Status.COMPLETED); + softly.assertThat(entryWithId(list, inProgressId)) + .isEqualTo(TaskManager.Status.IN_PROGRESS); + } + + private TaskManager.Status entryWithId(List<TaskExecutionDetails> list, TaskId taskId) { + return list.stream() + .filter(e -> e.getTaskId().equals(taskId)) + .findFirst().get() + .getStatus(); + } + + @Test + public void listShouldAllowToSeeWaitingTasks() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch latch3 = new CountDownLatch(1); + + memoryTaskManager.submit( + () -> Task.Result.PARTIAL); + memoryTaskManager.submit( + () -> Task.Result.COMPLETED); + memoryTaskManager.submit( + () -> { + await(latch1); + latch2.countDown(); + await(latch3); + return Task.Result.COMPLETED; + }); + TaskId waitingId = memoryTaskManager.submit( + () -> { + await(latch3); + latch2.countDown(); + return Task.Result.COMPLETED; + }); + + latch1.countDown(); + latch2.await(); + + assertThat(memoryTaskManager.list(TaskManager.Status.WAITING)) + .extracting(TaskExecutionDetails::getTaskId) + .containsOnly(waitingId); + } + + @Test + public void listShouldAllowToSeeInProgressTasks() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch latch3 = new CountDownLatch(1); + + memoryTaskManager.submit( + () -> Task.Result.PARTIAL); + TaskId successfulId = memoryTaskManager.submit( + () -> Task.Result.COMPLETED); + memoryTaskManager.submit( + () -> { + await(latch1); + latch2.countDown(); + await(latch3); + return Task.Result.COMPLETED; + }); + memoryTaskManager.submit( + () -> { + await(latch3); + latch2.countDown(); + return Task.Result.COMPLETED; + }); + + latch1.countDown(); + latch2.await(); + + assertThat(memoryTaskManager.list(TaskManager.Status.COMPLETED)) + .extracting(TaskExecutionDetails::getTaskId) + .containsOnly(successfulId); + } + + @Test + public void listShouldAllowToSeeFailedTasks() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch latch3 = new CountDownLatch(1); + + TaskId failedId = memoryTaskManager.submit( + () -> Task.Result.PARTIAL); + memoryTaskManager.submit( + () -> Task.Result.COMPLETED); + memoryTaskManager.submit( + () -> { + await(latch1); + latch2.countDown(); + await(latch3); + return Task.Result.COMPLETED; + }); + memoryTaskManager.submit( + () -> { + await(latch3); + latch2.countDown(); + return Task.Result.COMPLETED; + }); + + latch1.countDown(); + latch2.await(); + + assertThat(memoryTaskManager.list(TaskManager.Status.FAILED)) + .extracting(TaskExecutionDetails::getTaskId) + .containsOnly(failedId); + } + + @Test + public void listShouldAllowToSeeSuccessfulTasks() throws Exception { + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + CountDownLatch latch3 = new CountDownLatch(1); + + memoryTaskManager.submit( + () -> Task.Result.PARTIAL); + memoryTaskManager.submit( + () -> Task.Result.COMPLETED); + TaskId inProgressId = memoryTaskManager.submit( + () -> { + await(latch1); + latch2.countDown(); + await(latch3); + return Task.Result.COMPLETED; + }); + memoryTaskManager.submit( + () -> { + await(latch3); + latch2.countDown(); + return Task.Result.COMPLETED; + }); + + latch1.countDown(); + latch2.await(); + + assertThat(memoryTaskManager.list(TaskManager.Status.IN_PROGRESS)) + .extracting(TaskExecutionDetails::getTaskId) + .containsOnly(inProgressId); + } + + @Test + public void listShouldBeEmptyWhenNoTasks() throws Exception { + assertThat(memoryTaskManager.list()).isEmpty(); + } + + @Test + public void listCancelledShouldBeEmptyWhenNoTasks() throws Exception { + assertThat(memoryTaskManager.list(TaskManager.Status.CANCELLED)).isEmpty(); + } + + @Test + public void awaitShouldNotThrowWhenCompletedTask() throws Exception { + TaskId taskId = memoryTaskManager.submit( + () -> Task.Result.COMPLETED); + memoryTaskManager.await(taskId); + memoryTaskManager.await(taskId); + } + + @Test + public void submittedTaskShouldExecuteSequentially() { + ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(); + + TaskId id1 = memoryTaskManager.submit(() -> { + queue.add(1); + sleep(500); + queue.add(2); + return Task.Result.COMPLETED; + }); + TaskId id2 = memoryTaskManager.submit(() -> { + queue.add(3); + sleep(500); + queue.add(4); + return Task.Result.COMPLETED; + }); + memoryTaskManager.await(id1); + memoryTaskManager.await(id2); + + assertThat(queue) + .containsExactly(1, 2, 3, 4); + } + + @Test + public void awaitShouldReturnFailedWhenExceptionThrown() { + TaskId taskId = memoryTaskManager.submit(() -> { + throw new RuntimeException(); + }); + + TaskExecutionDetails executionDetails = memoryTaskManager.await(taskId); + + assertThat(executionDetails.getStatus()) + .isEqualTo(TaskManager.Status.FAILED); + } + + @Test + public void getStatusShouldReturnFailedWhenExceptionThrown() { + TaskId taskId = memoryTaskManager.submit(() -> { + throw new RuntimeException(); + }); + + memoryTaskManager.await(taskId); + + assertThat(memoryTaskManager.getExecutionDetails(taskId).getStatus()) + .isEqualTo(TaskManager.Status.FAILED); + } + + public void sleep(int durationInMs) { + try { + Thread.sleep(durationInMs); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + } + + public void await(CountDownLatch countDownLatch) { + try { + countDownLatch.await(); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
