>From Murtadha Hubail <[email protected]>: Murtadha Hubail has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204 )
Change subject: [NO ISSUE][EXT] Configurable Gzip compression level ...................................................................... [NO ISSUE][EXT] Configurable Gzip compression level - user model changes: no - storage format changes: no - interface changes: no Details: - Add an option "gzipCompressionLevel" to specify the compression level (1-9) to be used when gzip is used with COPY TO. - Default gzip compression level to -1 (library default) when not specified. - Make the compression buffer size equal to a frame size. Change-Id: I6e80691e232269620d76e6b6f414cff6856f3232 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> --- M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java 7 files changed, 95 insertions(+), 13 deletions(-) Approvals: Wail Alkowaileet: Looks good to me, approved Jenkins: Verified; Verified diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp index 6df40c3..1d7253b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp @@ -36,7 +36,8 @@ "serviceEndpoint":"http://127.0.0.1:8001", "container":"playground", "format":"json", - "compression":"gzip" + "compression":"gzip", + "gzipCompressionLevel": "1" } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index fc8f527..79252ad 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -304,6 +304,7 @@ * Compression constants */ public static final String KEY_COMPRESSION_GZIP = "gzip"; + public static final String KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL = "gzipCompressionLevel"; /** * Writer Constants diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index ae3b567..b02122d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -1085,4 +1085,8 @@ return ExternalDataConstants.KEY_PATH; } } + + public static boolean isGzipCompression(String compression) { + return ExternalDataConstants.KEY_COMPRESSION_GZIP.equalsIgnoreCase(compression); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java index 00fe855..b348c1e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.external.util; +import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT; + import java.util.List; import java.util.Map; import java.util.Set; @@ -59,6 +62,9 @@ checkSupported(ExternalDataConstants.KEY_WRITER_COMPRESSION, compression, ExternalDataConstants.WRITER_SUPPORTED_COMPRESSION, ErrorCode.UNKNOWN_COMPRESSION_SCHEME, sourceLocation, true); + if (ExternalDataUtils.isGzipCompression(compression)) { + validateGzipCompressionLevel(configuration, sourceLocation); + } } private static void validateMaxResult(Map<String, String> configuration, SourceLocation sourceLocation) @@ -92,4 +98,21 @@ } } + private static void validateGzipCompressionLevel(Map<String, String> configuration, SourceLocation sourceLocation) + throws CompilationException { + String compressionLevelStr = configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL); + if (compressionLevelStr == null) { + return; + } + try { + int compressionLevel = Integer.parseInt(compressionLevelStr); + if (compressionLevel < 1 || compressionLevel > 9) { + throw new CompilationException(INVALID_REQ_PARAM_VAL, sourceLocation, + ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL, compressionLevelStr); + } + } catch (NumberFormatException e) { + throw CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, compressionLevelStr); + } + } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java index 5ef196d..a587487 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java @@ -22,19 +22,30 @@ import java.io.OutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipParameters; import org.apache.hyracks.api.exceptions.HyracksDataException; public class GzipExternalFileCompressStreamFactory implements IExternalFileCompressStreamFactory { private static final long serialVersionUID = -7364595253362922025L; - public static IExternalFileCompressStreamFactory INSTANCE = new GzipExternalFileCompressStreamFactory(); + private final int compressionLevel; + private final int bufferSize; - private GzipExternalFileCompressStreamFactory() { + public static GzipExternalFileCompressStreamFactory create(int compressionLevel, int bufferSize) { + return new GzipExternalFileCompressStreamFactory(compressionLevel, bufferSize); + } + + private GzipExternalFileCompressStreamFactory(int compressionLevel, int bufferSize) { + this.compressionLevel = compressionLevel; + this.bufferSize = bufferSize; } @Override public OutputStream createStream(OutputStream outputStream) throws HyracksDataException { try { - return new GzipCompressorOutputStream(outputStream); + GzipParameters gzipParam = new GzipParameters(); + gzipParam.setCompressionLevel(compressionLevel); + gzipParam.setBufferSize(bufferSize); + return new GzipCompressorOutputStream(outputStream, gzipParam); } catch (IOException e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 416e32f..e861a3f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -761,7 +761,7 @@ fileWriterFactory.validate(); String fileExtension = ExternalWriterProvider.getFileExtension(sink); int maxResult = ExternalWriterProvider.getMaxResult(sink); - IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType); + IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(appCtx, sink, sourceType); ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory, fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation); SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java index 9253a48..cf51200 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java @@ -20,10 +20,12 @@ import java.util.HashMap; import java.util.Map; +import java.util.zip.Deflater; import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory; import org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory; import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory; @@ -41,7 +43,6 @@ public class ExternalWriterProvider { private static final Map<String, IExternalFileWriterFactoryProvider> CREATOR_MAP; - private static final Map<String, IExternalFileCompressStreamFactory> STREAM_COMPRESSORS; private ExternalWriterProvider() { } @@ -50,10 +51,6 @@ CREATOR_MAP = new HashMap<>(); addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, LocalFSExternalFileWriterFactory.PROVIDER); addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, S3ExternalFileWriterFactory.PROVIDER); - - STREAM_COMPRESSORS = new HashMap<>(); - STREAM_COMPRESSORS.put(ExternalDataConstants.KEY_COMPRESSION_GZIP, - GzipExternalFileCompressStreamFactory.INSTANCE); } public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink, @@ -105,7 +102,8 @@ CREATOR_MAP.put(adapterName.toLowerCase(), creator); } - public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) { + public static IExternalPrinterFactory createPrinter(ICcApplicationContext appCtx, IWriteDataSink sink, + Object sourceType) { Map<String, String> configuration = sink.getConfiguration(); String format = configuration.get(ExternalDataConstants.KEY_FORMAT); @@ -116,8 +114,7 @@ String compression = getCompression(configuration); IExternalFileCompressStreamFactory compressStreamFactory = - STREAM_COMPRESSORS.getOrDefault(compression, NoOpExternalFileCompressStreamFactory.INSTANCE); - + createCompressionStreamFactory(appCtx, compression, configuration); IPrinterFactory printerFactory = CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType); return new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory); } @@ -139,4 +136,23 @@ return creator.getSeparator(); } + + private static IExternalFileCompressStreamFactory createCompressionStreamFactory(ICcApplicationContext appCtx, + String compression, Map<String, String> configuration) { + if (ExternalDataUtils.isGzipCompression(compression)) { + return createGzipStreamFactory(appCtx, configuration); + } + return NoOpExternalFileCompressStreamFactory.INSTANCE; + } + + private static GzipExternalFileCompressStreamFactory createGzipStreamFactory(ICcApplicationContext appCtx, + Map<String, String> configuration) { + int compressionLevel = Deflater.DEFAULT_COMPRESSION; + String gzipCompressionLevel = configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL); + if (gzipCompressionLevel != null) { + compressionLevel = Integer.parseInt(gzipCompressionLevel); + } + return GzipExternalFileCompressStreamFactory.create(compressionLevel, + appCtx.getCompilerProperties().getFrameSize()); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I6e80691e232269620d76e6b6f414cff6856f3232 Gerrit-Change-Number: 18204 Gerrit-PatchSet: 2 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-MessageType: merged
