This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 04444a485b [NO ISSUE][EXT] Configurable Gzip compression level 04444a485b is described below commit 04444a485b6ea8739debd3602b2f323e0f35d244 Author: Murtadha Hubail <mhub...@apache.org> AuthorDate: Mon Mar 18 02:21:16 2024 +0300 [NO ISSUE][EXT] Configurable Gzip compression level - user model changes: no - storage format changes: no - interface changes: no Details: - Add an option "gzipCompressionLevel" to specify the compression level (1-9) to be used when gzip is used with COPY TO. - Default gzip compression level to -1 (library default) when not specified. - Make the compression buffer size equal to a frame size. Change-Id: I6e80691e232269620d76e6b6f414cff6856f3232 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Wail Alkowaileet <wael....@gmail.com> --- .../copy-to/query/query.02.update.sqlpp | 3 +- .../external/util/ExternalDataConstants.java | 1 + .../asterix/external/util/ExternalDataUtils.java | 4 +++ .../external/util/WriterValidationUtil.java | 23 ++++++++++++++++ .../GzipExternalFileCompressStreamFactory.java | 17 ++++++++++-- .../metadata/declared/MetadataProvider.java | 2 +- .../metadata/provider/ExternalWriterProvider.java | 32 ++++++++++++++++------ 7 files changed, 69 insertions(+), 13 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp index 6df40c3fcb..1d7253b21b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp @@ -36,7 +36,8 @@ WITH { "serviceEndpoint":"http://127.0.0.1:8001", "container":"playground", "format":"json", - "compression":"gzip" + "compression":"gzip", + "gzipCompressionLevel": "1" } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index fc8f527047..79252ada33 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -304,6 +304,7 @@ public class ExternalDataConstants { * Compression constants */ public static final String KEY_COMPRESSION_GZIP = "gzip"; + public static final String KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL = "gzipCompressionLevel"; /** * Writer Constants diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index ae3b567f79..b02122d719 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -1085,4 +1085,8 @@ public class ExternalDataUtils { return ExternalDataConstants.KEY_PATH; } } + + public static boolean isGzipCompression(String compression) { + return ExternalDataConstants.KEY_COMPRESSION_GZIP.equalsIgnoreCase(compression); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java index 00fe855195..b348c1e21c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.external.util; +import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL; +import static org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT; + import java.util.List; import java.util.Map; import java.util.Set; @@ -59,6 +62,9 @@ public class WriterValidationUtil { checkSupported(ExternalDataConstants.KEY_WRITER_COMPRESSION, compression, ExternalDataConstants.WRITER_SUPPORTED_COMPRESSION, ErrorCode.UNKNOWN_COMPRESSION_SCHEME, sourceLocation, true); + if (ExternalDataUtils.isGzipCompression(compression)) { + validateGzipCompressionLevel(configuration, sourceLocation); + } } private static void validateMaxResult(Map<String, String> configuration, SourceLocation sourceLocation) @@ -92,4 +98,21 @@ public class WriterValidationUtil { } } + private static void validateGzipCompressionLevel(Map<String, String> configuration, SourceLocation sourceLocation) + throws CompilationException { + String compressionLevelStr = configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL); + if (compressionLevelStr == null) { + return; + } + try { + int compressionLevel = Integer.parseInt(compressionLevelStr); + if (compressionLevel < 1 || compressionLevel > 9) { + throw new CompilationException(INVALID_REQ_PARAM_VAL, sourceLocation, + ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL, compressionLevelStr); + } + } catch (NumberFormatException e) { + throw CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, compressionLevelStr); + } + } + } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java index 5ef196d190..a5874874d5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java @@ -22,19 +22,30 @@ import java.io.IOException; import java.io.OutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipParameters; import org.apache.hyracks.api.exceptions.HyracksDataException; public class GzipExternalFileCompressStreamFactory implements IExternalFileCompressStreamFactory { private static final long serialVersionUID = -7364595253362922025L; - public static IExternalFileCompressStreamFactory INSTANCE = new GzipExternalFileCompressStreamFactory(); + private final int compressionLevel; + private final int bufferSize; - private GzipExternalFileCompressStreamFactory() { + public static GzipExternalFileCompressStreamFactory create(int compressionLevel, int bufferSize) { + return new GzipExternalFileCompressStreamFactory(compressionLevel, bufferSize); + } + + private GzipExternalFileCompressStreamFactory(int compressionLevel, int bufferSize) { + this.compressionLevel = compressionLevel; + this.bufferSize = bufferSize; } @Override public OutputStream createStream(OutputStream outputStream) throws HyracksDataException { try { - return new GzipCompressorOutputStream(outputStream); + GzipParameters gzipParam = new GzipParameters(); + gzipParam.setCompressionLevel(compressionLevel); + gzipParam.setBufferSize(bufferSize); + return new GzipCompressorOutputStream(outputStream, gzipParam); } catch (IOException e) { throw HyracksDataException.create(e); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 416e32f5a0..e861a3fc5f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -761,7 +761,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> fileWriterFactory.validate(); String fileExtension = ExternalWriterProvider.getFileExtension(sink); int maxResult = ExternalWriterProvider.getMaxResult(sink); - IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(sink, sourceType); + IExternalPrinterFactory printerFactory = ExternalWriterProvider.createPrinter(appCtx, sink, sourceType); ExternalFileWriterFactory writerFactory = new ExternalFileWriterFactory(fileWriterFactory, printerFactory, fileExtension, maxResult, dynamicPathEvalFactory, staticPath, pathSourceLocation); SinkExternalWriterRuntimeFactory runtime = new SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java index 9253a48492..cf5120061f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java @@ -20,10 +20,12 @@ package org.apache.asterix.metadata.provider; import java.util.HashMap; import java.util.Map; +import java.util.zip.Deflater; import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory; import org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory; import org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory; @@ -41,7 +43,6 @@ import org.apache.hyracks.control.cc.ClusterControllerService; public class ExternalWriterProvider { private static final Map<String, IExternalFileWriterFactoryProvider> CREATOR_MAP; - private static final Map<String, IExternalFileCompressStreamFactory> STREAM_COMPRESSORS; private ExternalWriterProvider() { } @@ -50,10 +51,6 @@ public class ExternalWriterProvider { CREATOR_MAP = new HashMap<>(); addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, LocalFSExternalFileWriterFactory.PROVIDER); addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, S3ExternalFileWriterFactory.PROVIDER); - - STREAM_COMPRESSORS = new HashMap<>(); - STREAM_COMPRESSORS.put(ExternalDataConstants.KEY_COMPRESSION_GZIP, - GzipExternalFileCompressStreamFactory.INSTANCE); } public static IExternalFileWriterFactory createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink, @@ -105,7 +102,8 @@ public class ExternalWriterProvider { CREATOR_MAP.put(adapterName.toLowerCase(), creator); } - public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, Object sourceType) { + public static IExternalPrinterFactory createPrinter(ICcApplicationContext appCtx, IWriteDataSink sink, + Object sourceType) { Map<String, String> configuration = sink.getConfiguration(); String format = configuration.get(ExternalDataConstants.KEY_FORMAT); @@ -116,8 +114,7 @@ public class ExternalWriterProvider { String compression = getCompression(configuration); IExternalFileCompressStreamFactory compressStreamFactory = - STREAM_COMPRESSORS.getOrDefault(compression, NoOpExternalFileCompressStreamFactory.INSTANCE); - + createCompressionStreamFactory(appCtx, compression, configuration); IPrinterFactory printerFactory = CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType); return new TextualExternalFilePrinterFactory(printerFactory, compressStreamFactory); } @@ -139,4 +136,23 @@ public class ExternalWriterProvider { return creator.getSeparator(); } + + private static IExternalFileCompressStreamFactory createCompressionStreamFactory(ICcApplicationContext appCtx, + String compression, Map<String, String> configuration) { + if (ExternalDataUtils.isGzipCompression(compression)) { + return createGzipStreamFactory(appCtx, configuration); + } + return NoOpExternalFileCompressStreamFactory.INSTANCE; + } + + private static GzipExternalFileCompressStreamFactory createGzipStreamFactory(ICcApplicationContext appCtx, + Map<String, String> configuration) { + int compressionLevel = Deflater.DEFAULT_COMPRESSION; + String gzipCompressionLevel = configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL); + if (gzipCompressionLevel != null) { + compressionLevel = Integer.parseInt(gzipCompressionLevel); + } + return GzipExternalFileCompressStreamFactory.create(compressionLevel, + appCtx.getCompilerProperties().getFrameSize()); + } }