This is an automated email from the ASF dual-hosted git repository.

peterlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-compress.git


The following commit(s) were added to refs/heads/master by this push:
     new 7457bbb  COMPRESS-504 allow setting of compression level in 
ParallelScatterZipCreator
7457bbb is described below

commit 7457bbbdf76e6a617cdfe2f9314aea540be8db3c
Author: PeterAlfredLee <peteralfred...@gmail.com>
AuthorDate: Mon Mar 16 19:41:20 2020 +0800

    COMPRESS-504 allow setting of compression level in ParallelScatterZipCreator
    
    Allow setting of compression level in ParallelScatterZipCreator, and add 
some testcases for ParallelScatterZipCreator
---
 .../archivers/zip/ParallelScatterZipCreator.java   |  25 ++-
 .../zip/ParallelScatterZipCreatorTest.java         | 171 ++++++++++++++++++++-
 2 files changed, 193 insertions(+), 3 deletions(-)

diff --git 
a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
 
b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
index dfa8524..7cd4c23 100644
--- 
a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
+++ 
b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java
@@ -59,6 +59,7 @@ public class ParallelScatterZipCreator {
     private final long startedAt = System.currentTimeMillis();
     private long compressionDoneAt = 0;
     private long scatterDoneAt;
+    private final int compressionLevel;
 
     private static class DefaultBackingStoreSupplier implements 
ScatterGatherBackingStoreSupplier {
         final AtomicInteger storeNum = new AtomicInteger(0);
@@ -74,7 +75,7 @@ public class ParallelScatterZipCreator {
             throws IOException {
         final ScatterGatherBackingStore bs = 
scatterGatherBackingStoreSupplier.get();
         // lifecycle is bound to the ScatterZipOutputStream returned
-        final StreamCompressor sc = 
StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs); //NOSONAR
+        final StreamCompressor sc = StreamCompressor.create(compressionLevel, 
bs); //NOSONAR
         return new ScatterZipOutputStream(bs, sc);
     }
 
@@ -118,8 +119,30 @@ public class ParallelScatterZipCreator {
      */
     public ParallelScatterZipCreator(final ExecutorService executorService,
                                      final ScatterGatherBackingStoreSupplier 
backingStoreSupplier) {
+        this(executorService, backingStoreSupplier, 
Deflater.DEFAULT_COMPRESSION);
+    }
+
+    /**
+     * Create a ParallelScatterZipCreator
+     *
+     * @param executorService      The executorService to use. For technical 
reasons, this will be shut down
+     *                             by this class.
+     * @param backingStoreSupplier The supplier of backing store which shall 
be used
+     * @param compressionLevel     The compression level used in compression, 
this value should be
+     *                             -1(default level) or between 0~9.
+     * @throws IllegalArgumentException if the compression level is illegal
+     */
+    public ParallelScatterZipCreator(final ExecutorService executorService,
+                                     final ScatterGatherBackingStoreSupplier 
backingStoreSupplier,
+                                     final int compressionLevel) throws 
IllegalArgumentException {
+        if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > 
Deflater.BEST_COMPRESSION)
+                && compressionLevel != Deflater.DEFAULT_COMPRESSION) {
+            throw new IllegalArgumentException("Compression level is expected 
between -1~9");
+        }
+
         this.backingStoreSupplier = backingStoreSupplier;
         es = executorService;
+        this.compressionLevel = compressionLevel;
     }
 
     /**
diff --git 
a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
 
b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
index f2417a9..3b1111a 100644
--- 
a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
+++ 
b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java
@@ -27,6 +27,8 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Enumeration;
@@ -35,14 +37,21 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.zip.Deflater;
 import java.util.zip.ZipEntry;
 
+import static org.apache.commons.compress.AbstractTestCase.getFile;
 import static org.apache.commons.compress.AbstractTestCase.tryHardToDelete;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class ParallelScatterZipCreatorTest {
 
     private final int NUMITEMS = 5000;
+    private static final long EXPECTED_FILE_SIZE = 1024 * 1024; // 1MB
+    private static final int EXPECTED_FILES_NUMBER = 50;
 
     private File result;
     private File tmp;
@@ -101,7 +110,71 @@ public class ParallelScatterZipCreatorTest {
         });
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsExceptionWithCompressionLevelTooBig() throws Exception {
+        final int compressLevelTooBig = Deflater.BEST_COMPRESSION + 1;
+        final ExecutorService es = Executors.newFixedThreadPool(1);
+        final ScatterGatherBackingStoreSupplier supp = new 
ScatterGatherBackingStoreSupplier() {
+            @Override
+            public ScatterGatherBackingStore get() throws IOException {
+                return new FileBasedScatterGatherBackingStore(tmp = 
File.createTempFile("parallelscatter", "n1"));
+            }
+        };
+
+        new ParallelScatterZipCreator(es, supp, compressLevelTooBig);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void throwsExceptionWithCompressionLevelTooSmall() throws Exception 
{
+        final int compressLevelTooSmall = Deflater.DEFAULT_COMPRESSION - 1;
+        final ExecutorService es = Executors.newFixedThreadPool(1);
+        final ScatterGatherBackingStoreSupplier supp = new 
ScatterGatherBackingStoreSupplier() {
+            @Override
+            public ScatterGatherBackingStore get() throws IOException {
+                return new FileBasedScatterGatherBackingStore(tmp = 
File.createTempFile("parallelscatter", "n1"));
+            }
+        };
+
+        new ParallelScatterZipCreator(es, supp, compressLevelTooSmall);
+    }
+
+    @Test
+    public void callableWithLowestLevelApiUsingSubmit() throws Exception {
+        result = File.createTempFile("parallelScatterGather4", "");
+        callableApiWithTestFiles(new CallableConsumerSupplier() {
+            @Override
+            public CallableConsumer apply(final ParallelScatterZipCreator 
zipCreator) {
+                return new CallableConsumer() {
+                    @Override
+                    public void accept(Callable<? extends 
ScatterZipOutputStream> c) {
+                        zipCreator.submit(c);
+                    }
+                };
+            }
+        }, Deflater.NO_COMPRESSION);
+    }
+
+    @Test
+    public void callableApiWithHighestLevelUsingSubmitStreamAwareCallable() 
throws Exception {
+        result = File.createTempFile("parallelScatterGather5", "");
+        callableApiWithTestFiles(new CallableConsumerSupplier() {
+            @Override
+            public CallableConsumer apply(final ParallelScatterZipCreator 
zipCreator) {
+                return new CallableConsumer() {
+                    @Override
+                    public void accept(Callable<? extends 
ScatterZipOutputStream> c) {
+                        zipCreator.submitStreamAwareCallable(c);
+                    }
+                };
+            }
+        }, Deflater.BEST_COMPRESSION);
+    }
+
     private void callableApi(CallableConsumerSupplier consumerSupplier) throws 
Exception {
+        callableApi(consumerSupplier, Deflater.DEFAULT_COMPRESSION);
+    }
+
+    private void callableApi(CallableConsumerSupplier consumerSupplier, int 
compressionLevel) throws Exception {
         final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
         zos.setEncoding("UTF-8");
         final ExecutorService es = Executors.newFixedThreadPool(1);
@@ -113,7 +186,7 @@ public class ParallelScatterZipCreatorTest {
             }
         };
 
-        final ParallelScatterZipCreator zipCreator = new 
ParallelScatterZipCreator(es, supp);
+        final ParallelScatterZipCreator zipCreator = new 
ParallelScatterZipCreator(es, supp, compressionLevel);
         final Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator, 
consumerSupplier.apply(zipCreator));
         zipCreator.writeTo(zos);
         zos.close();
@@ -124,6 +197,37 @@ public class ParallelScatterZipCreatorTest {
         assertNotNull(zipCreator.getStatisticsMessage());
     }
 
+    private void callableApiWithTestFiles(CallableConsumerSupplier 
consumerSupplier, int compressionLevel) throws Exception {
+        final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result);
+        zos.setEncoding("UTF-8");
+        final ExecutorService es = Executors.newFixedThreadPool(1);
+
+        final ScatterGatherBackingStoreSupplier supp = new 
ScatterGatherBackingStoreSupplier() {
+            @Override
+            public ScatterGatherBackingStore get() throws IOException {
+                return new FileBasedScatterGatherBackingStore(tmp = 
File.createTempFile("parallelscatter", "n1"));
+            }
+        };
+
+        final ParallelScatterZipCreator zipCreator = new 
ParallelScatterZipCreator(es, supp, compressionLevel);
+        final Map<String, byte[]> entries = 
writeTestFilesAsCallable(zipCreator, consumerSupplier.apply(zipCreator));
+        zipCreator.writeTo(zos);
+        zos.close();
+
+        // validate the content of the compressed files
+        try (final ZipFile zf = new ZipFile(result)) {
+            final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = 
zf.getEntriesInPhysicalOrder();
+            while (entriesInPhysicalOrder.hasMoreElements()) {
+                final ZipArchiveEntry zipArchiveEntry = 
entriesInPhysicalOrder.nextElement();
+                final InputStream inputStream = 
zf.getInputStream(zipArchiveEntry);
+                final byte[] actual = IOUtils.toByteArray(inputStream);
+                final byte[] expected = 
entries.remove(zipArchiveEntry.getName());
+                assertArrayEquals("For " + zipArchiveEntry.getName(), 
expected, actual);
+            }
+        }
+        assertNotNull(zipCreator.getStatisticsMessage());
+    }
+
     private void removeEntriesFoundInZipFile(final File result, final 
Map<String, byte[]> entries) throws IOException {
         final ZipFile zf = new ZipFile(result);
         final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = 
zf.getEntriesInPhysicalOrder();
@@ -196,6 +300,69 @@ public class ParallelScatterZipCreatorTest {
         return entries;
     }
 
+    /**
+     * Try to compress the files in src/test/resources with size no bigger than
+     * {@value EXPECTED_FILES_NUMBER} and with a mount of files no bigger than
+     * {@value EXPECTED_FILES_NUMBER}
+     *
+     * @param zipCreator The ParallelScatterZipCreator
+     * @param consumer   The parallel consumer
+     * @return A map using file name as key and file content as value
+     * @throws IOException if exceptions occur when opening files
+     */
+    private Map<String, byte[]> writeTestFilesAsCallable(final 
ParallelScatterZipCreator zipCreator,
+                                                         final 
CallableConsumer consumer) throws IOException {
+        final Map<String, byte[]> entries = new HashMap<>();
+        File baseDir = getFile("");
+        int filesCount = 0;
+        for (final File file : baseDir.listFiles()) {
+            // do not compress too many files
+            if (filesCount >= EXPECTED_FILES_NUMBER) {
+                break;
+            }
+
+            // skip files that are too large
+            if (file.isDirectory() || file.length() > EXPECTED_FILE_SIZE) {
+                continue;
+            }
+
+            entries.put(file.getName(), IOUtils.toByteArray(new 
FileInputStream(file)));
+
+            final ZipArchiveEntry zipArchiveEntry = new 
ZipArchiveEntry(file.getName());
+            zipArchiveEntry.setMethod(ZipEntry.DEFLATED);
+            zipArchiveEntry.setSize(file.length());
+            zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 0664);
+
+            final InputStreamSupplier iss = new InputStreamSupplier() {
+                @Override
+                public InputStream get() {
+                    try {
+                        return new FileInputStream(file);
+                    } catch (FileNotFoundException e) {
+                        return null;
+                    }
+                }
+            };
+
+            final Callable<ScatterZipOutputStream> callable;
+            if (filesCount % 2 == 0) {
+                callable = zipCreator.createCallable(zipArchiveEntry, iss);
+            } else {
+                final ZipArchiveEntryRequestSupplier zaSupplier = new 
ZipArchiveEntryRequestSupplier() {
+                    @Override
+                    public ZipArchiveEntryRequest get() {
+                        return 
ZipArchiveEntryRequest.createZipArchiveEntryRequest(zipArchiveEntry, iss);
+                    }
+                };
+                callable = zipCreator.createCallable(zaSupplier);
+            }
+
+            consumer.accept(callable);
+            filesCount++;
+        }
+        return entries;
+    }
+
     private ZipArchiveEntry createZipArchiveEntry(final Map<String, byte[]> 
entries, final int i, final byte[] payloadBytes) {
         final ZipArchiveEntry za = new ZipArchiveEntry( "file" + i);
         entries.put( za.getName(), payloadBytes);

Reply via email to