This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2aa3780338026e45290d2fe2627279fafef8d6ba
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Tue Mar 26 19:24:34 2019 +0700

    JAMES-2685 Zipper.ContentLoader should return Optional<InputStream>
    
    To let the zipper aware the empty-ness of the content of deleted messages
---
 .../apache/james/vault/DeletedMessageZipper.java   | 20 ++---
 .../james/vault/DeletedMessageZipperTest.java      | 26 +++++--
 pom.xml                                            |  4 +-
 .../routes/DeletedMessagesVaultExportTask.java     |  6 +-
 .../james/webadmin/vault/routes/ExportService.java | 87 ++++++----------------
 .../routes/DeletedMessagesVaultRoutesTest.java     |  3 +-
 6 files changed, 58 insertions(+), 88 deletions(-)

diff --git 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageZipper.java
 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageZipper.java
index 22e3fd2..1d4a89b 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageZipper.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageZipper.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Optional;
 import java.util.stream.Stream;
 
 import org.apache.commons.compress.archivers.zip.ExtraFieldUtils;
@@ -39,7 +40,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 public class DeletedMessageZipper {
     public interface DeletedMessageContentLoader {
-        InputStream load(DeletedMessage deletedMessage);
+        Optional<InputStream> load(DeletedMessage deletedMessage);
     }
 
     public DeletedMessageZipper() {
@@ -49,7 +50,8 @@ public class DeletedMessageZipper {
 
     public void zip(DeletedMessageContentLoader contentLoader, 
Stream<DeletedMessage> deletedMessages, OutputStream outputStream) throws 
IOException {
         try (ZipArchiveOutputStream zipOutputStream = 
newZipArchiveOutputStream(outputStream)) {
-            ThrowingConsumer<DeletedMessage> putInZip = message -> 
putMessageToEntry(zipOutputStream, message, contentLoader);
+            ThrowingConsumer<DeletedMessage> putInZip =
+                message -> putMessageToEntry(zipOutputStream, message, 
contentLoader.load(message));
 
             deletedMessages.forEach(Throwing.consumer(putInZip).sneakyThrow());
 
@@ -63,13 +65,13 @@ public class DeletedMessageZipper {
     }
 
     @VisibleForTesting
-    void putMessageToEntry(ZipArchiveOutputStream zipOutputStream, 
DeletedMessage message, DeletedMessageContentLoader contentLoader) throws 
IOException {
-        try (InputStream content = contentLoader.load(message)) {
-            zipOutputStream.putArchiveEntry(createEntry(zipOutputStream, 
message));
-
-            IOUtils.copy(content, zipOutputStream);
-
-            zipOutputStream.closeArchiveEntry();
+    void putMessageToEntry(ZipArchiveOutputStream zipOutputStream, 
DeletedMessage message, Optional<InputStream> maybeContent) throws IOException {
+        if (maybeContent.isPresent()) {
+            try (InputStream closableMessageContent = maybeContent.get()) {
+                zipOutputStream.putArchiveEntry(createEntry(zipOutputStream, 
message));
+                IOUtils.copy(closableMessageContent, zipOutputStream);
+                zipOutputStream.closeArchiveEntry();
+            }
         }
     }
 
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageZipperTest.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageZipperTest.java
index 2c8bfdf..efd9355 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageZipperTest.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageZipperTest.java
@@ -44,6 +44,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
@@ -60,7 +61,7 @@ import org.mockito.stubbing.Answer;
 import com.github.fge.lambdas.Throwing;
 
 class DeletedMessageZipperTest {
-    private static final DeletedMessageContentLoader CONTENT_LOADER = message 
-> new ByteArrayInputStream(CONTENT);
+    private static final DeletedMessageContentLoader CONTENT_LOADER = message 
-> Optional.of(new ByteArrayInputStream(CONTENT));
     private static final String MESSAGE_CONTENT = new String(CONTENT, 
StandardCharsets.UTF_8);
     private DeletedMessageZipper zipper;
 
@@ -183,8 +184,8 @@ class DeletedMessageZipperTest {
             DeletedMessageContentLoader contentLoader = spy(new 
DeletedMessageContentLoader() {
                 // lambdas are final and thus can't be spied
                 @Override
-                public InputStream load(DeletedMessage deletedMessage) {
-                    return new ByteArrayInputStream(CONTENT);
+                public Optional<InputStream> load(DeletedMessage 
deletedMessage) {
+                    return Optional.of(new ByteArrayInputStream(CONTENT));
                 }
             });
 
@@ -196,13 +197,24 @@ class DeletedMessageZipperTest {
 
             verify(contentLoader, times(1)).load(any());
         }
+
+        @Test
+        void zipShouldNotPutEntryIfContentLoaderReturnsEmptyResult() throws 
Exception {
+            DeletedMessageContentLoader contentLoader = message -> 
Optional.empty();
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            zipper.zip(contentLoader, Stream.of(DELETED_MESSAGE, 
DELETED_MESSAGE_2), outputStream);
+
+            try (ZipAssert zipAssert = assertThatZip(outputStream)) {
+                zipAssert.hasNoEntry();
+            }
+        }
     }
 
     private DeletedMessageZipper.DeletedMessageContentLoader 
spyLoadedContents(Collection<InputStream> loadedContents) {
-        Answer<InputStream> spyedContent = invocationOnMock -> {
-            InputStream result = spy(new ByteArrayInputStream(CONTENT));
-            loadedContents.add(result);
-            return result;
+        Answer<Optional<InputStream>> spyedContent = invocationOnMock -> {
+            InputStream spied = spy(new ByteArrayInputStream(CONTENT));
+            loadedContents.add(spied);
+            return Optional.of(spied);
         };
         DeletedMessageContentLoader contentLoader = 
mock(DeletedMessageContentLoader.class);
         when(contentLoader.load(any())).thenAnswer(spyedContent);
diff --git a/pom.xml b/pom.xml
index 0a66da5..335423e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1103,12 +1103,12 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
-                <artifactId>blob-export-guice</artifactId>
+                <artifactId>blob-export-file</artifactId>
                 <version>${project.version}</version>
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
-                <artifactId>blob-export-file</artifactId>
+                <artifactId>blob-export-guice</artifactId>
                 <version>${project.version}</version>
             </dependency>
             <dependency>
diff --git 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultExportTask.java
 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultExportTask.java
index de4204b..ca551d4 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultExportTask.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultExportTask.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.webadmin.vault.routes;
 
+import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -79,10 +80,9 @@ class DeletedMessagesVaultExportTask implements Task {
     public Result run() {
         try {
             Runnable messageToShareCallback = 
totalExportedMessages::incrementAndGet;
-            exportService.export(userExportFrom, exportQuery, exportTo, 
messageToShareCallback)
-                .block();
+            exportService.export(userExportFrom, exportQuery, exportTo, 
messageToShareCallback);
             return Result.COMPLETED;
-        } catch (Exception e) {
+        } catch (IOException e) {
             LOGGER.error("Error happens when exporting deleted messages from 
{} to {}", userExportFrom.asString(), exportTo.asString());
             return Result.PARTIAL;
         }
diff --git 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
index 2e3c014..8dc77d1 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/main/java/org/apache/james/webadmin/vault/routes/ExportService.java
@@ -19,19 +19,11 @@
 
 package org.apache.james.webadmin.vault.routes;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.Collection;
-import java.util.UUID;
-import java.util.function.Function;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.export.api.BlobExportMechanism;
@@ -42,29 +34,14 @@ import org.apache.james.vault.DeletedMessageVault;
 import org.apache.james.vault.DeletedMessageZipper;
 import org.apache.james.vault.search.Query;
 
-import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.functions.ThrowingFunction;
-import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteSource;
+import com.google.common.io.FileBackedOutputStream;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 
 class ExportService {
-
-    private class ZippedData {
-        private final long contentLength;
-        private final InputStream inputStream;
-
-        private ZippedData(long contentLength, InputStream content) {
-            this.contentLength = contentLength;
-            this.inputStream = content;
-        }
-    }
-
-    private static final String TEMPORARY_FILE_EXTENSION = ".temp";
-
     private final BlobExportMechanism blobExport;
     private final BlobStore blobStore;
     private final DeletedMessageZipper zipper;
@@ -79,53 +56,31 @@ class ExportService {
         this.vault = vault;
     }
 
-    Mono<Void> export(User user, Query exportQuery, MailAddress 
exportToAddress,
-                      Runnable messageToExportCallback) {
+    void export(User user, Query exportQuery, MailAddress exportToAddress, 
Runnable messageToExportCallback) throws IOException {
+        Flux<DeletedMessage> matchedMessages = Flux.from(vault.search(user, 
exportQuery))
+            .doOnNext(any -> messageToExportCallback.run());
 
-        return matchingMessages(user, exportQuery)
-            .doOnNext(any -> messageToExportCallback.run())
-            .collect(Guavate.toImmutableList())
-            .map(Collection::stream)
-            .flatMap(messages -> Mono.fromCallable(() -> zipData(user, 
messages)))
-            .flatMap(sneakyThrow(zippedData -> 
blobStore.save(zippedData.inputStream, zippedData.contentLength)))
-            .flatMap(blobId -> exportTo(user, exportToAddress, blobId))
-            .then();
-    }
+        BlobId blobId = zipToBlob(user, matchedMessages);
 
-    private Flux<DeletedMessage> matchingMessages(User user, Query 
exportQuery) {
-        return Flux.from(vault.search(user, exportQuery))
-            .publishOn(Schedulers.elastic());
-    }
-
-    private ZippedData zipData(User user, Stream<DeletedMessage> messages) 
throws IOException {
-        File tempFile = temporaryFile();
-        FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
-
-        zipper.zip(message -> loadMessageContent(user, message), messages, 
fileOutputStream);
-        return new ZippedData(tempFile.length(), new 
FileInputStream(tempFile));
-    }
-
-    private File temporaryFile() throws IOException {
-        String tempFileName = UUID.randomUUID().toString();
-        File tempFile = Files.createTempFile(tempFileName, 
TEMPORARY_FILE_EXTENSION).toFile();
-        tempFile.deleteOnExit();
-        return tempFile;
+        blobExport.blobId(blobId)
+            .with(exportToAddress)
+            .explanation(exportMessage(user))
+            .export();
     }
 
-    private InputStream loadMessageContent(User user, DeletedMessage message) {
-        return Mono.from(vault.loadMimeMessage(user, message.getMessageId()))
-            .block();
+    private BlobId zipToBlob(User user, Flux<DeletedMessage> messages) throws 
IOException {
+        try (FileBackedOutputStream fileOutputStream = new 
FileBackedOutputStream(FileUtils.ONE_MB_BI.intValue())) {
+            zipper.zip(contentLoader(user), messages.toStream(), 
fileOutputStream);
+            ByteSource byteSource = fileOutputStream.asByteSource();
+            return blobStore.save(byteSource.openStream(), 
byteSource.size()).block();
+        }
     }
 
-    private Mono<Void> exportTo(User user, MailAddress exportToAddress, BlobId 
blobId) {
-        return Mono.fromRunnable(() -> blobExport
-            .blobId(blobId)
-            .with(exportToAddress)
-            .explanation(String.format("Some deleted messages from user %s has 
been shared to you", user.asString()))
-            .export());
+    private DeletedMessageZipper.DeletedMessageContentLoader 
contentLoader(User user) {
+        return message -> Mono.from(vault.loadMimeMessage(user, 
message.getMessageId())).blockOptional();
     }
 
-    private <T, R> Function<T, R> sneakyThrow(ThrowingFunction<T, R> function) 
{
-        return Throwing.function(function).sneakyThrow();
+    private String exportMessage(User user) {
+        return String.format("Some deleted messages from user %s has been 
shared to you", user.asString());
     }
 }
diff --git 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
index fcc563f..82f84be 100644
--- 
a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
@@ -58,6 +58,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Stream;
 
 import org.apache.commons.configuration.DefaultConfigurationBuilder;
@@ -1839,7 +1840,7 @@ class DeletedMessagesVaultRoutesTest {
 
         private byte[] zippedMessagesData() throws IOException {
             ByteArrayOutputStream expectedZippedData = new 
ByteArrayOutputStream();
-            zipper.zip(message -> new ByteArrayInputStream(CONTENT),
+            zipper.zip(message -> Optional.of(new 
ByteArrayInputStream(CONTENT)),
                 Stream.of(DELETED_MESSAGE, DELETED_MESSAGE_2),
                 expectedZippedData);
             return expectedZippedData.toByteArray();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to