JAMES-2294 Routes for mail reprocessing
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c73d5de8 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c73d5de8 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c73d5de8 Branch: refs/heads/master Commit: c73d5de8df02bc638dbcfe5916e63163dd3b5111 Parents: e6fdeae Author: benwa <[email protected]> Authored: Thu Jan 25 10:57:41 2018 +0700 Committer: benwa <[email protected]> Committed: Fri Jan 26 08:12:05 2018 +0700 ---------------------------------------------------------------------- .../webadmin/routes/MailRepositoriesRoutes.java | 139 +++- .../routes/MailRepositoriesRoutesTest.java | 646 ++++++++++++++++++- 2 files changed, 755 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/c73d5de8/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java index 1078206..a90323b 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/main/java/org/apache/james/webadmin/routes/MailRepositoriesRoutes.java @@ -19,6 +19,7 @@ package org.apache.james.webadmin.routes; +import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.util.List; @@ -32,6 +33,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import org.apache.james.mailrepository.api.MailRepositoryStore; +import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.task.Task; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; @@ -42,6 +44,9 @@ import org.apache.james.webadmin.Routes; import org.apache.james.webadmin.dto.ExtendedMailRepositoryResponse; import org.apache.james.webadmin.dto.TaskIdDto; import org.apache.james.webadmin.service.MailRepositoryStoreService; +import org.apache.james.webadmin.service.ReprocessingAllMailsTask; +import org.apache.james.webadmin.service.ReprocessingOneMailTask; +import org.apache.james.webadmin.service.ReprocessingService; import org.apache.james.webadmin.utils.ErrorResponder; import org.apache.james.webadmin.utils.ErrorResponder.ErrorType; import org.apache.james.webadmin.utils.JsonTransformer; @@ -55,6 +60,7 @@ import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import io.swagger.jaxrs.PATCH; import spark.Request; import spark.Service; @@ -67,13 +73,15 @@ public class MailRepositoriesRoutes implements Routes { private final JsonTransformer jsonTransformer; private final MailRepositoryStoreService repositoryStoreService; + private final ReprocessingService reprocessingService; private final TaskManager taskManager; private Service service; @Inject - public MailRepositoriesRoutes(MailRepositoryStoreService repositoryStoreService, JsonTransformer jsonTransformer, TaskManager taskManager) { + public MailRepositoriesRoutes(MailRepositoryStoreService repositoryStoreService, JsonTransformer jsonTransformer, ReprocessingService reprocessingService, TaskManager taskManager) { this.repositoryStoreService = repositoryStoreService; this.jsonTransformer = jsonTransformer; + this.reprocessingService = reprocessingService; this.taskManager = taskManager; } @@ -92,6 +100,10 @@ public class MailRepositoriesRoutes implements Routes { defineDeleteMail(); defineDeleteAll(); + + defineReprocessAll(); + + defineReprocessOne(); } @GET @@ -168,7 +180,7 @@ public class MailRepositoriesRoutes implements Routes { }) public void defineGetMail() { service.get(MAIL_REPOSITORIES + "/:encodedUrl/mails/:mailKey", (request, response) -> { - String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName()); + String url = decodedRepositoryUrl(request); String mailKey = request.params("mailKey"); try { return repositoryStoreService.retrieveMail(url, mailKey) @@ -228,7 +240,7 @@ public class MailRepositoriesRoutes implements Routes { }) public void defineDeleteMail() { service.delete(MAIL_REPOSITORIES + "/:encodedUrl/mails/:mailKey", (request, response) -> { - String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName()); + String url = decodedRepositoryUrl(request); String mailKey = request.params("mailKey"); try { response.status(HttpStatus.NO_CONTENT_204); @@ -255,7 +267,7 @@ public class MailRepositoriesRoutes implements Routes { }) public void defineDeleteAll() { service.delete(MAIL_REPOSITORIES + "/:encodedUrl/mails", (request, response) -> { - String url = URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName()); + String url = decodedRepositoryUrl(request); try { Task task = repositoryStoreService.createClearMailRepositoryTask(url); TaskId taskId = taskManager.submit(task); @@ -271,6 +283,125 @@ public class MailRepositoriesRoutes implements Routes { }, jsonTransformer); } + @PATCH + @Path("/{encodedUrl}/mails") + @ApiOperation(value = "Reprocessing all mails in that mailRepository") + @ApiImplicitParams({ + @ApiImplicitParam( + required = true, + name = "action", + paramType = "query parameter", + dataType = "String", + defaultValue = "none", + example = "?action=reprocess", + value = "Compulsory. Only supported value is `reprocess`"), + @ApiImplicitParam( + required = false, + name = "queue", + paramType = "query parameter", + dataType = "String", + defaultValue = "spool", + example = "?queue=outgoing", + value = "Indicates in which queue the mails stored in the repository should be re-enqueued"), + @ApiImplicitParam( + required = false, + paramType = "query parameter", + name = "processor", + dataType = "String", + defaultValue = "absent", + example = "?processor=transport", + value = "If present, modifies the state property of the mail to allow their processing by a specific mail container processor.") + }) + @ApiResponses(value = { + @ApiResponse(code = HttpStatus.CREATED_201, message = "Task is created", response = TaskIdDto.class), + @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side."), + @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Bad request - unknown action") + }) + public void defineReprocessAll() { + service.patch(MAIL_REPOSITORIES + "/:encodedUrl/mails", (request, response) -> { + Task task = toAllMailReprocessingTask(request); + TaskId taskId = taskManager.submit(task); + return TaskIdDto.respond(response, taskId); + }, jsonTransformer); + } + + private Task toAllMailReprocessingTask(Request request) throws UnsupportedEncodingException, MailRepositoryStore.MailRepositoryStoreException, MessagingException { + String url = decodedRepositoryUrl(request); + enforceActionParameter(request); + Optional<String> targetProcessor = Optional.ofNullable(request.queryParams("processor")); + String targetQueue = Optional.ofNullable(request.queryParams("queue")).orElse(MailQueueFactory.SPOOL); + + Long repositorySize = repositoryStoreService.size(url).orElse(0L); + return new ReprocessingAllMailsTask(reprocessingService, repositorySize, url, targetQueue, targetProcessor); + } + + @PATCH + @Path("/{encodedUrl}/mails/{key}") + @ApiOperation(value = "Reprocessing a single mail in that mailRepository") + @ApiImplicitParams({ + @ApiImplicitParam( + required = true, + name = "action", + paramType = "query parameter", + dataType = "String", + defaultValue = "none", + example = "?action=reprocess", + value = "Compulsory. Only supported value is `reprocess`"), + @ApiImplicitParam( + required = false, + name = "queue", + paramType = "query parameter", + dataType = "String", + defaultValue = "spool", + example = "?queue=outgoing", + value = "Indicates in which queue the mails stored in the repository should be re-enqueued"), + @ApiImplicitParam( + required = false, + paramType = "query parameter", + name = "processor", + dataType = "String", + defaultValue = "absent", + example = "?processor=transport", + value = "If present, modifies the state property of the mail to allow their processing by a specific mail container processor.") + }) + @ApiResponses(value = { + @ApiResponse(code = HttpStatus.CREATED_201, message = "Task is created", response = TaskIdDto.class), + @ApiResponse(code = HttpStatus.INTERNAL_SERVER_ERROR_500, message = "Internal server error - Something went bad on the server side."), + @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Bad request - unknown action") + }) + public void defineReprocessOne() { + service.patch(MAIL_REPOSITORIES + "/:encodedUrl/mails/:key", (request, response) -> { + Task task = toOneMailReprocessingTask(request); + TaskId taskId = taskManager.submit(task); + return TaskIdDto.respond(response, taskId); + }, jsonTransformer); + } + + private Task toOneMailReprocessingTask(Request request) throws UnsupportedEncodingException { + String url = decodedRepositoryUrl(request); + String key = request.params("key"); + enforceActionParameter(request); + Optional<String> targetProcessor = Optional.ofNullable(request.queryParams("processor")); + String targetQueue = Optional.ofNullable(request.queryParams("queue")).orElse(MailQueueFactory.SPOOL); + + return new ReprocessingOneMailTask(reprocessingService, url, targetQueue, key, targetProcessor); + } + + private void enforceActionParameter(Request request) { + String action = request.queryParams("action"); + if (!"reprocess".equals(action)) { + throw ErrorResponder.builder() + .statusCode(HttpStatus.BAD_REQUEST_400) + .type(ErrorType.INVALID_ARGUMENT) + .message("action query parameter is mandatory. The only supported value is `reprocess`") + .haltError(); + } + } + + private String decodedRepositoryUrl(Request request) throws UnsupportedEncodingException { + return URLDecoder.decode(request.params("encodedUrl"), StandardCharsets.UTF_8.displayName()); + } + private Optional<Integer> assertPositiveInteger(Request request, String parameterName) { try { return Optional.ofNullable(request.queryParams(parameterName)) http://git-wip-us.apache.org/repos/asf/james-project/blob/c73d5de8/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java ---------------------------------------------------------------------- diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java index cbad2b4..a6dd4de 100644 --- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java @@ -41,13 +41,21 @@ import java.util.Optional; import org.apache.james.mailrepository.api.MailRepositoryStore; import org.apache.james.mailrepository.memory.MemoryMailRepository; import org.apache.james.metrics.api.NoopMetricFactory; +import org.apache.james.queue.api.MailQueueFactory; +import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; +import org.apache.james.queue.memory.MemoryMailQueueFactory; import org.apache.james.task.MemoryTaskManager; import org.apache.james.webadmin.WebAdminServer; import org.apache.james.webadmin.WebAdminUtils; import org.apache.james.webadmin.service.ClearMailRepositoryTask; import org.apache.james.webadmin.service.MailRepositoryStoreService; +import org.apache.james.webadmin.service.ReprocessingAllMailsTask; +import org.apache.james.webadmin.service.ReprocessingOneMailTask; +import org.apache.james.webadmin.service.ReprocessingService; import org.apache.james.webadmin.utils.ErrorResponder; import org.apache.james.webadmin.utils.JsonTransformer; +import org.apache.mailet.Mail; import org.apache.mailet.base.test.FakeMail; import org.eclipse.jetty.http.HttpStatus; import org.junit.After; @@ -63,9 +71,14 @@ public class MailRepositoriesRoutesTest { public static final String URL_MY_REPO = "url://myRepo"; public static final String URL_ESCAPED_MY_REPO = "url%3A%2F%2FmyRepo"; public static final String MY_REPO_MAILS = "url%3A%2F%2FmyRepo/mails"; + public static final String CUSTOM_QUEUE = "customQueue"; + public static final String NAME_1 = "name1"; + public static final String NAME_2 = "name2"; private WebAdminServer webAdminServer; private MailRepositoryStore mailRepositoryStore; private MemoryMailRepository mailRepository; + private ManageableMailQueue spoolQueue; + private ManageableMailQueue customQueue; @Before public void setUp() throws Exception { @@ -74,10 +87,18 @@ public class MailRepositoriesRoutesTest { MemoryTaskManager taskManager = new MemoryTaskManager(); JsonTransformer jsonTransformer = new JsonTransformer(); + MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory()); + spoolQueue = queueFactory.createQueue(MailQueueFactory.SPOOL); + customQueue = queueFactory.createQueue(CUSTOM_QUEUE); + + MailRepositoryStoreService repositoryStoreService = new MailRepositoryStoreService(mailRepositoryStore); + + ReprocessingService reprocessingService = new ReprocessingService(queueFactory, repositoryStoreService); + webAdminServer = WebAdminUtils.createWebAdminServer( new NoopMetricFactory(), - new MailRepositoriesRoutes(new MailRepositoryStoreService(mailRepositoryStore), - jsonTransformer, taskManager), + new MailRepositoriesRoutes(repositoryStoreService, + jsonTransformer, reprocessingService, taskManager), new TasksRoutes(taskManager, jsonTransformer)); webAdminServer.configure(NO_CONFIGURATION); webAdminServer.await(); @@ -289,10 +310,10 @@ public class MailRepositoriesRoutesTest { when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); mailRepository.store(FakeMail.builder() - .name("name1") + .name(NAME_1) .build()); mailRepository.store(FakeMail.builder() - .name("name2") + .name(NAME_2) .build()); when() @@ -300,7 +321,7 @@ public class MailRepositoriesRoutesTest { .then() .statusCode(HttpStatus.OK_200) .body("", hasSize(2)) - .body("mailKey", containsInAnyOrder("name1", "name2")); + .body("mailKey", containsInAnyOrder(NAME_1, NAME_2)); } @Test @@ -354,7 +375,7 @@ public class MailRepositoriesRoutesTest { when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); mailRepository.store(FakeMail.builder() - .name("name1") + .name(NAME_1) .build()); given() @@ -369,7 +390,7 @@ public class MailRepositoriesRoutesTest { public void retrievingAMailShouldDisplayItsInformation() throws Exception { when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); - String name = "name1"; + String name = NAME_1; String sender = "sender@domain"; String recipient1 = "recipient1@domain"; String recipient2 = "recipient2@domain"; @@ -398,16 +419,15 @@ public class MailRepositoriesRoutesTest { public void retrievingAMailShouldNotFailWhenOnlyNameProperty() throws Exception { when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); - String name = "name1"; mailRepository.store(FakeMail.builder() - .name(name) + .name(NAME_1) .build()); when() - .get(URL_ESCAPED_MY_REPO + "/mails/" + name) + .get(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1) .then() .statusCode(HttpStatus.OK_200) - .body("name", is(name)) + .body("name", is(NAME_1)) .body("sender", isEmptyOrNullString()) .body("state", isEmptyOrNullString()) .body("error", isEmptyOrNullString()) @@ -432,37 +452,34 @@ public class MailRepositoriesRoutesTest { public void deletingAMailShouldRemoveIt() throws Exception { when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); - String name1 = "name1"; - String name2 = "name2"; mailRepository.store(FakeMail.builder() - .name(name1) + .name(NAME_1) .build()); mailRepository.store(FakeMail.builder() - .name(name2) + .name(NAME_2) .build()); given() - .delete(URL_ESCAPED_MY_REPO + "/mails/" + name1); + .delete(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1); when() .get(URL_ESCAPED_MY_REPO + "/mails") .then() .statusCode(HttpStatus.OK_200) .body("", hasSize(1)) - .body("mailKey", contains(name2)); + .body("mailKey", contains(NAME_2)); } @Test public void deletingAMailShouldReturnOkWhenExist() throws Exception { when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); - String name1 = "name1"; mailRepository.store(FakeMail.builder() - .name(name1) + .name(NAME_1) .build()); when() - .delete(URL_ESCAPED_MY_REPO + "/mails/" + name1) + .delete(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1) .then() .statusCode(HttpStatus.NO_CONTENT_204); } @@ -493,13 +510,11 @@ public class MailRepositoriesRoutesTest { public void clearTaskShouldHaveDetails() throws Exception { when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); - String name1 = "name1"; - String name2 = "name2"; mailRepository.store(FakeMail.builder() - .name(name1) + .name(NAME_1) .build()); mailRepository.store(FakeMail.builder() - .name(name2) + .name(NAME_2) .build()); String taskId = with() @@ -528,10 +543,10 @@ public class MailRepositoriesRoutesTest { when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); mailRepository.store(FakeMail.builder() - .name("name1") + .name(NAME_1) .build()); mailRepository.store(FakeMail.builder() - .name("name2") + .name(NAME_2) .build()); String taskId = with() @@ -576,4 +591,583 @@ public class MailRepositoriesRoutesTest { .body("message", is(URL_MY_REPO + "does not exist")); } + @Test + public void reprocessingAllTaskShouldCreateATask() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + + when() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess") + .then() + .statusCode(HttpStatus.CREATED_201) + .header("Location", is(notNullValue())) + .body("taskId", is(notNullValue())); + } + + @Test + public void reprocessingAllTaskShouldRejectInvalidActions() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + + when() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=invalid") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`")); + } + + @Test + public void reprocessingAllTaskShouldRequireAction() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + + when() + .patch(URL_ESCAPED_MY_REPO + "/mails") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`")); + } + + @Test + public void reprocessingAllTaskShouldIncludeDetailsWhenDefaultValues() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess") + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(notNullValue())) + .body("type", is(ReprocessingAllMailsTask.TYPE)) + .body("additionalInformation.repositoryUrl", is(URL_MY_REPO)) + .body("additionalInformation.initialCount", is(2)) + .body("additionalInformation.remainingCount", is(0)) + .body("additionalInformation.targetProcessor", isEmptyOrNullString()) + .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL)) + .body("startedDate", is(notNullValue())) + .body("submitDate", is(notNullValue())) + .body("completedDate", is(notNullValue())); + } + + @Test + public void reprocessingAllTaskShouldIncludeDetails() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String name1 = "name1"; + String name2 = "name2"; + mailRepository.store(FakeMail.builder() + .name(name1) + .build()); + mailRepository.store(FakeMail.builder() + .name(name2) + .build()); + + String transport = "transport"; + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess" + + "&queue=" + CUSTOM_QUEUE + + "&processor=" + transport) + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(notNullValue())) + .body("type", is(ReprocessingAllMailsTask.TYPE)) + .body("additionalInformation.repositoryUrl", is(URL_MY_REPO)) + .body("additionalInformation.initialCount", is(2)) + .body("additionalInformation.remainingCount", is(0)) + .body("additionalInformation.targetProcessor", is(transport)) + .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE)) + .body("startedDate", is(notNullValue())) + .body("submitDate", is(notNullValue())) + .body("completedDate", is(notNullValue())); + } + + @Test + public void reprocessingAllTaskShouldClearMailRepository() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String name1 = "name1"; + String name2 = "name2"; + mailRepository.store(FakeMail.builder() + .name(name1) + .build()); + mailRepository.store(FakeMail.builder() + .name(name2) + .build()); + + String transport = "transport"; + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess" + + "&queue=" + CUSTOM_QUEUE + + "&processor=" + transport) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(mailRepository.list()).isEmpty(); + } + + @Test + public void reprocessingAllTaskShouldEnqueueMailsOnDefaultQueue() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess") + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(spoolQueue.browse()) + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsOnly(NAME_1, NAME_2); + } + + @Test + public void reprocessingAllTaskShouldPreserveStateWhenProcessorIsNotSpecified() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String state1 = "state1"; + String state2 = "state2"; + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .state(state1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .state(state2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess") + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(spoolQueue.browse()) + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getState) + .containsOnly(state1, state2); + } + + @Test + public void reprocessingAllTaskShouldOverWriteStateWhenProcessorSpecified() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String state1 = "state1"; + String state2 = "state2"; + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .state(state1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .state(state2) + .build()); + + String transport = "transport"; + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess" + + "&processor=" + transport) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(spoolQueue.browse()) + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getState) + .containsOnly(transport, transport); + } + + @Test + public void reprocessingAllTaskShouldEnqueueMailsOnSpecifiedQueue() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails?action=reprocess" + + "&queue=" + CUSTOM_QUEUE) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(customQueue.browse()) + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsOnly(NAME_1, NAME_2); + } + + @Test + public void reprocessingOneTaskShouldCreateATask() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + + when() + .patch(URL_ESCAPED_MY_REPO + "/mails/name1?action=reprocess") + .then() + .statusCode(HttpStatus.CREATED_201) + .header("Location", is(notNullValue())) + .body("taskId", is(notNullValue())); + } + + @Test + public void reprocessingOneTaskShouldRejectInvalidActions() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + + when() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=invalid") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`")); + } + + @Test + public void reprocessingOneTaskShouldRequireAction() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + + when() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1) + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .body("statusCode", is(400)) + .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType())) + .body("message", is("action query parameter is mandatory. The only supported value is `reprocess`")); + } + + @Test + public void reprocessingOneTaskShouldIncludeDetailsWhenDefaultValues() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String name1 = "name1"; + String name2 = "name2"; + mailRepository.store(FakeMail.builder() + .name(name1) + .build()); + mailRepository.store(FakeMail.builder() + .name(name2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess") + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(notNullValue())) + .body("type", is(ReprocessingOneMailTask.TYPE)) + .body("additionalInformation.repositoryUrl", is(URL_MY_REPO)) + .body("additionalInformation.mailKey", is(NAME_1)) + .body("additionalInformation.targetProcessor", isEmptyOrNullString()) + .body("additionalInformation.targetQueue", is(MailQueueFactory.SPOOL)) + .body("startedDate", is(notNullValue())) + .body("submitDate", is(notNullValue())) + .body("completedDate", is(notNullValue())); + } + + @Test + public void reprocessingOneTaskShouldIncludeDetails() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String name1 = "name1"; + String name2 = "name2"; + mailRepository.store(FakeMail.builder() + .name(name1) + .build()); + mailRepository.store(FakeMail.builder() + .name(name2) + .build()); + + String transport = "transport"; + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess" + + "&queue=" + CUSTOM_QUEUE + + "&processor=" + transport) + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(notNullValue())) + .body("type", is(ReprocessingOneMailTask.TYPE)) + .body("additionalInformation.repositoryUrl", is(URL_MY_REPO)) + .body("additionalInformation.mailKey", is(NAME_1)) + .body("additionalInformation.targetProcessor", is(transport)) + .body("additionalInformation.targetQueue", is(CUSTOM_QUEUE)) + .body("startedDate", is(notNullValue())) + .body("submitDate", is(notNullValue())) + .body("completedDate", is(notNullValue())); + } + + @Test + public void reprocessingOneTaskShouldRemoveMailFromRepository() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String name1 = "name1"; + String name2 = "name2"; + mailRepository.store(FakeMail.builder() + .name(name1) + .build()); + mailRepository.store(FakeMail.builder() + .name(name2) + .build()); + + String transport = "transport"; + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess" + + "&queue=" + CUSTOM_QUEUE + + "&processor=" + transport) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(mailRepository.list()) + .containsOnly(NAME_2); + } + + @Test + public void reprocessingOneTaskShouldEnqueueMailsOnDefaultQueue() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess") + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(spoolQueue.browse()) + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsOnly(NAME_1); + } + + @Test + public void reprocessingOneTaskShouldPreserveStateWhenProcessorIsNotSpecified() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String state1 = "state1"; + String state2 = "state2"; + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .state(state1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .state(state2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess") + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(spoolQueue.browse()) + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getState) + .containsOnly(state1); + } + + @Test + public void reprocessingOneTaskShouldOverWriteStateWhenProcessorSpecified() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + String state1 = "state1"; + String state2 = "state2"; + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .state(state1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .state(state2) + .build()); + + String transport = "transport"; + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess" + + "&processor=" + transport) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(spoolQueue.browse()) + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getState) + .containsOnly(transport); + } + + @Test + public void reprocessingOneTaskShouldEnqueueMailsOnSpecifiedQueue() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + NAME_1 + "?action=reprocess" + + "&queue=" + CUSTOM_QUEUE) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(customQueue.browse()) + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsOnly(NAME_1); + } + + @Test + public void reprocessingOneTaskShouldNotEnqueueUnknownMailKey() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess" + + "&queue=" + CUSTOM_QUEUE) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(customQueue.browse()) + .isEmpty(); + } + + @Test + public void reprocessingOneTaskShouldNotRemoveMailFromRepositoryWhenUnknownMailKey() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess" + + "&queue=" + CUSTOM_QUEUE) + .jsonPath() + .get("taskId"); + + with() + .basePath(TasksRoutes.BASE) + .get(taskId + "/await"); + + assertThat(mailRepository.size()) + .isEqualTo(2); + } + + @Test + public void reprocessingOneTaskShouldFailWhenUnknownMailKey() throws Exception { + when(mailRepositoryStore.get(URL_MY_REPO)).thenReturn(Optional.of(mailRepository)); + mailRepository.store(FakeMail.builder() + .name(NAME_1) + .build()); + mailRepository.store(FakeMail.builder() + .name(NAME_2) + .build()); + + String taskId = with() + .patch(URL_ESCAPED_MY_REPO + "/mails/" + "unknown" + "?action=reprocess" + + "&queue=" + CUSTOM_QUEUE) + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("failed")); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
