This is an automated email from the ASF dual-hosted git repository. peterlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-compress.git
The following commit(s) were added to refs/heads/master by this push: new 7457bbb COMPRESS-504 allow setting of compression level in ParallelScatterZipCreator 7457bbb is described below commit 7457bbbdf76e6a617cdfe2f9314aea540be8db3c Author: PeterAlfredLee <peteralfred...@gmail.com> AuthorDate: Mon Mar 16 19:41:20 2020 +0800 COMPRESS-504 allow setting of compression level in ParallelScatterZipCreator Allow setting of compression level in ParallelScatterZipCreator, and add some testcases for ParallelScatterZipCreator --- .../archivers/zip/ParallelScatterZipCreator.java | 25 ++- .../zip/ParallelScatterZipCreatorTest.java | 171 ++++++++++++++++++++- 2 files changed, 193 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java index dfa8524..7cd4c23 100644 --- a/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java +++ b/src/main/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreator.java @@ -59,6 +59,7 @@ public class ParallelScatterZipCreator { private final long startedAt = System.currentTimeMillis(); private long compressionDoneAt = 0; private long scatterDoneAt; + private final int compressionLevel; private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier { final AtomicInteger storeNum = new AtomicInteger(0); @@ -74,7 +75,7 @@ public class ParallelScatterZipCreator { throws IOException { final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get(); // lifecycle is bound to the ScatterZipOutputStream returned - final StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs); //NOSONAR + final StreamCompressor sc = StreamCompressor.create(compressionLevel, bs); //NOSONAR return new ScatterZipOutputStream(bs, sc); } @@ -118,8 +119,30 @@ public class ParallelScatterZipCreator { */ public ParallelScatterZipCreator(final ExecutorService executorService, final ScatterGatherBackingStoreSupplier backingStoreSupplier) { + this(executorService, backingStoreSupplier, Deflater.DEFAULT_COMPRESSION); + } + + /** + * Create a ParallelScatterZipCreator + * + * @param executorService The executorService to use. For technical reasons, this will be shut down + * by this class. + * @param backingStoreSupplier The supplier of backing store which shall be used + * @param compressionLevel The compression level used in compression, this value should be + * -1(default level) or between 0~9. + * @throws IllegalArgumentException if the compression level is illegal + */ + public ParallelScatterZipCreator(final ExecutorService executorService, + final ScatterGatherBackingStoreSupplier backingStoreSupplier, + final int compressionLevel) throws IllegalArgumentException { + if ((compressionLevel < Deflater.NO_COMPRESSION || compressionLevel > Deflater.BEST_COMPRESSION) + && compressionLevel != Deflater.DEFAULT_COMPRESSION) { + throw new IllegalArgumentException("Compression level is expected between -1~9"); + } + this.backingStoreSupplier = backingStoreSupplier; es = executorService; + this.compressionLevel = compressionLevel; } /** diff --git a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java index f2417a9..3b1111a 100644 --- a/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java +++ b/src/test/java/org/apache/commons/compress/archivers/zip/ParallelScatterZipCreatorTest.java @@ -27,6 +27,8 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.Enumeration; @@ -35,14 +37,21 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.zip.Deflater; import java.util.zip.ZipEntry; +import static org.apache.commons.compress.AbstractTestCase.getFile; import static org.apache.commons.compress.AbstractTestCase.tryHardToDelete; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ParallelScatterZipCreatorTest { private final int NUMITEMS = 5000; + private static final long EXPECTED_FILE_SIZE = 1024 * 1024; // 1MB + private static final int EXPECTED_FILES_NUMBER = 50; private File result; private File tmp; @@ -101,7 +110,71 @@ public class ParallelScatterZipCreatorTest { }); } + @Test(expected = IllegalArgumentException.class) + public void throwsExceptionWithCompressionLevelTooBig() throws Exception { + final int compressLevelTooBig = Deflater.BEST_COMPRESSION + 1; + final ExecutorService es = Executors.newFixedThreadPool(1); + final ScatterGatherBackingStoreSupplier supp = new ScatterGatherBackingStoreSupplier() { + @Override + public ScatterGatherBackingStore get() throws IOException { + return new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1")); + } + }; + + new ParallelScatterZipCreator(es, supp, compressLevelTooBig); + } + + @Test(expected = IllegalArgumentException.class) + public void throwsExceptionWithCompressionLevelTooSmall() throws Exception { + final int compressLevelTooSmall = Deflater.DEFAULT_COMPRESSION - 1; + final ExecutorService es = Executors.newFixedThreadPool(1); + final ScatterGatherBackingStoreSupplier supp = new ScatterGatherBackingStoreSupplier() { + @Override + public ScatterGatherBackingStore get() throws IOException { + return new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1")); + } + }; + + new ParallelScatterZipCreator(es, supp, compressLevelTooSmall); + } + + @Test + public void callableWithLowestLevelApiUsingSubmit() throws Exception { + result = File.createTempFile("parallelScatterGather4", ""); + callableApiWithTestFiles(new CallableConsumerSupplier() { + @Override + public CallableConsumer apply(final ParallelScatterZipCreator zipCreator) { + return new CallableConsumer() { + @Override + public void accept(Callable<? extends ScatterZipOutputStream> c) { + zipCreator.submit(c); + } + }; + } + }, Deflater.NO_COMPRESSION); + } + + @Test + public void callableApiWithHighestLevelUsingSubmitStreamAwareCallable() throws Exception { + result = File.createTempFile("parallelScatterGather5", ""); + callableApiWithTestFiles(new CallableConsumerSupplier() { + @Override + public CallableConsumer apply(final ParallelScatterZipCreator zipCreator) { + return new CallableConsumer() { + @Override + public void accept(Callable<? extends ScatterZipOutputStream> c) { + zipCreator.submitStreamAwareCallable(c); + } + }; + } + }, Deflater.BEST_COMPRESSION); + } + private void callableApi(CallableConsumerSupplier consumerSupplier) throws Exception { + callableApi(consumerSupplier, Deflater.DEFAULT_COMPRESSION); + } + + private void callableApi(CallableConsumerSupplier consumerSupplier, int compressionLevel) throws Exception { final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result); zos.setEncoding("UTF-8"); final ExecutorService es = Executors.newFixedThreadPool(1); @@ -113,7 +186,7 @@ public class ParallelScatterZipCreatorTest { } }; - final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp); + final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel); final Map<String, byte[]> entries = writeEntriesAsCallable(zipCreator, consumerSupplier.apply(zipCreator)); zipCreator.writeTo(zos); zos.close(); @@ -124,6 +197,37 @@ public class ParallelScatterZipCreatorTest { assertNotNull(zipCreator.getStatisticsMessage()); } + private void callableApiWithTestFiles(CallableConsumerSupplier consumerSupplier, int compressionLevel) throws Exception { + final ZipArchiveOutputStream zos = new ZipArchiveOutputStream(result); + zos.setEncoding("UTF-8"); + final ExecutorService es = Executors.newFixedThreadPool(1); + + final ScatterGatherBackingStoreSupplier supp = new ScatterGatherBackingStoreSupplier() { + @Override + public ScatterGatherBackingStore get() throws IOException { + return new FileBasedScatterGatherBackingStore(tmp = File.createTempFile("parallelscatter", "n1")); + } + }; + + final ParallelScatterZipCreator zipCreator = new ParallelScatterZipCreator(es, supp, compressionLevel); + final Map<String, byte[]> entries = writeTestFilesAsCallable(zipCreator, consumerSupplier.apply(zipCreator)); + zipCreator.writeTo(zos); + zos.close(); + + // validate the content of the compressed files + try (final ZipFile zf = new ZipFile(result)) { + final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder(); + while (entriesInPhysicalOrder.hasMoreElements()) { + final ZipArchiveEntry zipArchiveEntry = entriesInPhysicalOrder.nextElement(); + final InputStream inputStream = zf.getInputStream(zipArchiveEntry); + final byte[] actual = IOUtils.toByteArray(inputStream); + final byte[] expected = entries.remove(zipArchiveEntry.getName()); + assertArrayEquals("For " + zipArchiveEntry.getName(), expected, actual); + } + } + assertNotNull(zipCreator.getStatisticsMessage()); + } + private void removeEntriesFoundInZipFile(final File result, final Map<String, byte[]> entries) throws IOException { final ZipFile zf = new ZipFile(result); final Enumeration<ZipArchiveEntry> entriesInPhysicalOrder = zf.getEntriesInPhysicalOrder(); @@ -196,6 +300,69 @@ public class ParallelScatterZipCreatorTest { return entries; } + /** + * Try to compress the files in src/test/resources with size no bigger than + * {@value EXPECTED_FILES_NUMBER} and with a mount of files no bigger than + * {@value EXPECTED_FILES_NUMBER} + * + * @param zipCreator The ParallelScatterZipCreator + * @param consumer The parallel consumer + * @return A map using file name as key and file content as value + * @throws IOException if exceptions occur when opening files + */ + private Map<String, byte[]> writeTestFilesAsCallable(final ParallelScatterZipCreator zipCreator, + final CallableConsumer consumer) throws IOException { + final Map<String, byte[]> entries = new HashMap<>(); + File baseDir = getFile(""); + int filesCount = 0; + for (final File file : baseDir.listFiles()) { + // do not compress too many files + if (filesCount >= EXPECTED_FILES_NUMBER) { + break; + } + + // skip files that are too large + if (file.isDirectory() || file.length() > EXPECTED_FILE_SIZE) { + continue; + } + + entries.put(file.getName(), IOUtils.toByteArray(new FileInputStream(file))); + + final ZipArchiveEntry zipArchiveEntry = new ZipArchiveEntry(file.getName()); + zipArchiveEntry.setMethod(ZipEntry.DEFLATED); + zipArchiveEntry.setSize(file.length()); + zipArchiveEntry.setUnixMode(UnixStat.FILE_FLAG | 0664); + + final InputStreamSupplier iss = new InputStreamSupplier() { + @Override + public InputStream get() { + try { + return new FileInputStream(file); + } catch (FileNotFoundException e) { + return null; + } + } + }; + + final Callable<ScatterZipOutputStream> callable; + if (filesCount % 2 == 0) { + callable = zipCreator.createCallable(zipArchiveEntry, iss); + } else { + final ZipArchiveEntryRequestSupplier zaSupplier = new ZipArchiveEntryRequestSupplier() { + @Override + public ZipArchiveEntryRequest get() { + return ZipArchiveEntryRequest.createZipArchiveEntryRequest(zipArchiveEntry, iss); + } + }; + callable = zipCreator.createCallable(zaSupplier); + } + + consumer.accept(callable); + filesCount++; + } + return entries; + } + private ZipArchiveEntry createZipArchiveEntry(final Map<String, byte[]> entries, final int i, final byte[] payloadBytes) { final ZipArchiveEntry za = new ZipArchiveEntry( "file" + i); entries.put( za.getName(), payloadBytes);