This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2aa3780338026e45290d2fe2627279fafef8d6ba Author: Tran Tien Duc <dt...@linagora.com> AuthorDate: Tue Mar 26 19:24:34 2019 +0700 JAMES-2685 Zipper.ContentLoader should return Optional<InputStream> To let the zipper aware the empty-ness of the content of deleted messages --- .../apache/james/vault/DeletedMessageZipper.java | 20 ++--- .../james/vault/DeletedMessageZipperTest.java | 26 +++++-- pom.xml | 4 +- .../routes/DeletedMessagesVaultExportTask.java | 6 +- .../james/webadmin/vault/routes/ExportService.java | 87 ++++++---------------- .../routes/DeletedMessagesVaultRoutesTest.java | 3 +- 6 files changed, 58 insertions(+), 88 deletions(-) diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageZipper.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageZipper.java index 22e3fd2..1d4a89b 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageZipper.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageZipper.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Optional; import java.util.stream.Stream; import org.apache.commons.compress.archivers.zip.ExtraFieldUtils; @@ -39,7 +40,7 @@ import com.google.common.annotations.VisibleForTesting; public class DeletedMessageZipper { public interface DeletedMessageContentLoader { - InputStream load(DeletedMessage deletedMessage); + Optional<InputStream> load(DeletedMessage deletedMessage); } public DeletedMessageZipper() { @@ -49,7 +50,8 @@ public class DeletedMessageZipper { public void zip(DeletedMessageContentLoader contentLoader, Stream<DeletedMessage> deletedMessages, OutputStream outputStream) throws IOException { try (ZipArchiveOutputStream zipOutputStream = newZipArchiveOutputStream(outputStream)) { - ThrowingConsumer<DeletedMessage> putInZip = message -> putMessageToEntry(zipOutputStream, message, contentLoader); + ThrowingConsumer<DeletedMessage> putInZip = + message -> putMessageToEntry(zipOutputStream, message, contentLoader.load(message)); deletedMessages.forEach(Throwing.consumer(putInZip).sneakyThrow()); @@ -63,13 +65,13 @@ public class DeletedMessageZipper { } @VisibleForTesting - void putMessageToEntry(ZipArchiveOutputStream zipOutputStream, DeletedMessage message, DeletedMessageContentLoader contentLoader) throws IOException { - try (InputStream content = contentLoader.load(message)) { - zipOutputStream.putArchiveEntry(createEntry(zipOutputStream, message)); - - IOUtils.copy(content, zipOutputStream); - - zipOutputStream.closeArchiveEntry(); + void putMessageToEntry(ZipArchiveOutputStream zipOutputStream, DeletedMessage message, Optional<InputStream> maybeContent) throws IOException { + if (maybeContent.isPresent()) { + try (InputStream closableMessageContent = maybeContent.get()) { + zipOutputStream.putArchiveEntry(createEntry(zipOutputStream, message)); + IOUtils.copy(closableMessageContent, zipOutputStream); + zipOutputStream.closeArchiveEntry(); + } } } diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageZipperTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageZipperTest.java index 2c8bfdf..efd9355 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageZipperTest.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageZipperTest.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -60,7 +61,7 @@ import org.mockito.stubbing.Answer; import com.github.fge.lambdas.Throwing; class DeletedMessageZipperTest { - private static final DeletedMessageContentLoader CONTENT_LOADER = message -> new ByteArrayInputStream(CONTENT); + private static final DeletedMessageContentLoader CONTENT_LOADER = message -> Optional.of(new ByteArrayInputStream(CONTENT)); private static final String MESSAGE_CONTENT = new String(CONTENT, StandardCharsets.UTF_8); private DeletedMessageZipper zipper; @@ -183,8 +184,8 @@ class DeletedMessageZipperTest { DeletedMessageContentLoader contentLoader = spy(new DeletedMessageContentLoader() { // lambdas are final and thus can't be spied @Override - public InputStream load(DeletedMessage deletedMessage) { - return new ByteArrayInputStream(CONTENT); + public Optional<InputStream> load(DeletedMessage deletedMessage) { + return Optional.of(new ByteArrayInputStream(CONTENT)); } }); @@ -196,13 +197,24 @@ class DeletedMessageZipperTest { verify(contentLoader, times(1)).load(any()); } + + @Test + void zipShouldNotPutEntryIfContentLoaderReturnsEmptyResult() throws Exception { + DeletedMessageContentLoader contentLoader = message -> Optional.empty(); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + zipper.zip(contentLoader, Stream.of(DELETED_MESSAGE, DELETED_MESSAGE_2), outputStream); + + try (ZipAssert zipAssert = assertThatZip(outputStream)) { + zipAssert.hasNoEntry(); + } + } } private DeletedMessageZipper.DeletedMessageContentLoader spyLoadedContents(Collection<InputStream> loadedContents) { - Answer<InputStream> spyedContent = invocationOnMock -> { - InputStream result = spy(new ByteArrayInputStream(CONTENT)); - loadedContents.add(result); - return result; + Answer<Optional<InputStream>> spyedContent = invocationOnMock -> { + InputStream spied = spy(new ByteArrayInputStream(CONTENT)); + loadedContents.add(spied); + return Optional.of(spied); }; DeletedMessageContentLoader contentLoader = mock(DeletedMessageContentLoader.class); when(contentLoader.load(any())).thenAnswer(spyedContent); diff --git a/pom.xml b/pom.xml index 0a66da5..335423e 100644 --- a/pom.xml +++ b/pom.xml @@ -1103,12 +1103,12 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> - <artifactId>blob-export-guice</artifactId> + <artifactId>blob-export-file</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>${james.groupId}</groupId> - <artifactId>blob-export-file</artifactId> + <artifactId>blob-export-guice</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultExportTask.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultExportTask.java index de4204b..ca551d4 100644 --- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultExportTask.java +++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultExportTask.java @@ -19,6 +19,7 @@ package org.apache.james.webadmin.vault.routes; +import java.io.IOException; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -79,10 +80,9 @@ class DeletedMessagesVaultExportTask implements Task { public Result run() { try { Runnable messageToShareCallback = totalExportedMessages::incrementAndGet; - exportService.export(userExportFrom, exportQuery, exportTo, messageToShareCallback) - .block(); + exportService.export(userExportFrom, exportQuery, exportTo, messageToShareCallback); return Result.COMPLETED; - } catch (Exception e) { + } catch (IOException e) { LOGGER.error("Error happens when exporting deleted messages from {} to {}", userExportFrom.asString(), exportTo.asString()); return Result.PARTIAL; } diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java index 2e3c014..8dc77d1 100644 --- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java +++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java @@ -19,19 +19,11 @@ package org.apache.james.webadmin.vault.routes; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.util.Collection; -import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Stream; import javax.inject.Inject; +import org.apache.commons.io.FileUtils; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.export.api.BlobExportMechanism; @@ -42,29 +34,14 @@ import org.apache.james.vault.DeletedMessageVault; import org.apache.james.vault.DeletedMessageZipper; import org.apache.james.vault.search.Query; -import com.github.fge.lambdas.Throwing; -import com.github.fge.lambdas.functions.ThrowingFunction; -import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteSource; +import com.google.common.io.FileBackedOutputStream; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; class ExportService { - - private class ZippedData { - private final long contentLength; - private final InputStream inputStream; - - private ZippedData(long contentLength, InputStream content) { - this.contentLength = contentLength; - this.inputStream = content; - } - } - - private static final String TEMPORARY_FILE_EXTENSION = ".temp"; - private final BlobExportMechanism blobExport; private final BlobStore blobStore; private final DeletedMessageZipper zipper; @@ -79,53 +56,31 @@ class ExportService { this.vault = vault; } - Mono<Void> export(User user, Query exportQuery, MailAddress exportToAddress, - Runnable messageToExportCallback) { + void export(User user, Query exportQuery, MailAddress exportToAddress, Runnable messageToExportCallback) throws IOException { + Flux<DeletedMessage> matchedMessages = Flux.from(vault.search(user, exportQuery)) + .doOnNext(any -> messageToExportCallback.run()); - return matchingMessages(user, exportQuery) - .doOnNext(any -> messageToExportCallback.run()) - .collect(Guavate.toImmutableList()) - .map(Collection::stream) - .flatMap(messages -> Mono.fromCallable(() -> zipData(user, messages))) - .flatMap(sneakyThrow(zippedData -> blobStore.save(zippedData.inputStream, zippedData.contentLength))) - .flatMap(blobId -> exportTo(user, exportToAddress, blobId)) - .then(); - } + BlobId blobId = zipToBlob(user, matchedMessages); - private Flux<DeletedMessage> matchingMessages(User user, Query exportQuery) { - return Flux.from(vault.search(user, exportQuery)) - .publishOn(Schedulers.elastic()); - } - - private ZippedData zipData(User user, Stream<DeletedMessage> messages) throws IOException { - File tempFile = temporaryFile(); - FileOutputStream fileOutputStream = new FileOutputStream(tempFile); - - zipper.zip(message -> loadMessageContent(user, message), messages, fileOutputStream); - return new ZippedData(tempFile.length(), new FileInputStream(tempFile)); - } - - private File temporaryFile() throws IOException { - String tempFileName = UUID.randomUUID().toString(); - File tempFile = Files.createTempFile(tempFileName, TEMPORARY_FILE_EXTENSION).toFile(); - tempFile.deleteOnExit(); - return tempFile; + blobExport.blobId(blobId) + .with(exportToAddress) + .explanation(exportMessage(user)) + .export(); } - private InputStream loadMessageContent(User user, DeletedMessage message) { - return Mono.from(vault.loadMimeMessage(user, message.getMessageId())) - .block(); + private BlobId zipToBlob(User user, Flux<DeletedMessage> messages) throws IOException { + try (FileBackedOutputStream fileOutputStream = new FileBackedOutputStream(FileUtils.ONE_MB_BI.intValue())) { + zipper.zip(contentLoader(user), messages.toStream(), fileOutputStream); + ByteSource byteSource = fileOutputStream.asByteSource(); + return blobStore.save(byteSource.openStream(), byteSource.size()).block(); + } } - private Mono<Void> exportTo(User user, MailAddress exportToAddress, BlobId blobId) { - return Mono.fromRunnable(() -> blobExport - .blobId(blobId) - .with(exportToAddress) - .explanation(String.format("Some deleted messages from user %s has been shared to you", user.asString())) - .export()); + private DeletedMessageZipper.DeletedMessageContentLoader contentLoader(User user) { + return message -> Mono.from(vault.loadMimeMessage(user, message.getMessageId())).blockOptional(); } - private <T, R> Function<T, R> sneakyThrow(ThrowingFunction<T, R> function) { - return Throwing.function(function).sneakyThrow(); + private String exportMessage(User user) { + return String.format("Some deleted messages from user %s has been shared to you", user.asString()); } } diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java index fcc563f..82f84be 100644 --- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java @@ -58,6 +58,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.apache.commons.configuration.DefaultConfigurationBuilder; @@ -1839,7 +1840,7 @@ class DeletedMessagesVaultRoutesTest { private byte[] zippedMessagesData() throws IOException { ByteArrayOutputStream expectedZippedData = new ByteArrayOutputStream(); - zipper.zip(message -> new ByteArrayInputStream(CONTENT), + zipper.zip(message -> Optional.of(new ByteArrayInputStream(CONTENT)), Stream.of(DELETED_MESSAGE, DELETED_MESSAGE_2), expectedZippedData); return expectedZippedData.toByteArray(); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org