This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 04444a485b [NO ISSUE][EXT] Configurable Gzip compression level
04444a485b is described below

commit 04444a485b6ea8739debd3602b2f323e0f35d244
Author: Murtadha Hubail <mhub...@apache.org>
AuthorDate: Mon Mar 18 02:21:16 2024 +0300

    [NO ISSUE][EXT] Configurable Gzip compression level
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Add an option "gzipCompressionLevel" to specify the
      compression level (1-9) to be used when gzip is used
      with COPY TO.
    - Default gzip compression level to -1 (library default)
      when not specified.
    - Make the compression buffer size equal to a frame size.
    
    Change-Id: I6e80691e232269620d76e6b6f414cff6856f3232
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Wail Alkowaileet <wael....@gmail.com>
---
 .../copy-to/query/query.02.update.sqlpp            |  3 +-
 .../external/util/ExternalDataConstants.java       |  1 +
 .../asterix/external/util/ExternalDataUtils.java   |  4 +++
 .../external/util/WriterValidationUtil.java        | 23 ++++++++++++++++
 .../GzipExternalFileCompressStreamFactory.java     | 17 ++++++++++--
 .../metadata/declared/MetadataProvider.java        |  2 +-
 .../metadata/provider/ExternalWriterProvider.java  | 32 ++++++++++++++++------
 7 files changed, 69 insertions(+), 13 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
index 6df40c3fcb..1d7253b21b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
@@ -36,7 +36,8 @@ WITH {
     "serviceEndpoint":"http://127.0.0.1:8001";,
     "container":"playground",
     "format":"json",
-    "compression":"gzip"
+    "compression":"gzip",
+    "gzipCompressionLevel": "1"
 }
 
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fc8f527047..79252ada33 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -304,6 +304,7 @@ public class ExternalDataConstants {
      * Compression constants
      */
     public static final String KEY_COMPRESSION_GZIP = "gzip";
+    public static final String KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL = 
"gzipCompressionLevel";
 
     /**
      * Writer Constants
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index ae3b567f79..b02122d719 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -1085,4 +1085,8 @@ public class ExternalDataUtils {
                 return ExternalDataConstants.KEY_PATH;
         }
     }
+
+    public static boolean isGzipCompression(String compression) {
+        return 
ExternalDataConstants.KEY_COMPRESSION_GZIP.equalsIgnoreCase(compression);
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
index 00fe855195..b348c1e21c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.util;
 
+import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -59,6 +62,9 @@ public class WriterValidationUtil {
         checkSupported(ExternalDataConstants.KEY_WRITER_COMPRESSION, 
compression,
                 ExternalDataConstants.WRITER_SUPPORTED_COMPRESSION, 
ErrorCode.UNKNOWN_COMPRESSION_SCHEME,
                 sourceLocation, true);
+        if (ExternalDataUtils.isGzipCompression(compression)) {
+            validateGzipCompressionLevel(configuration, sourceLocation);
+        }
     }
 
     private static void validateMaxResult(Map<String, String> configuration, 
SourceLocation sourceLocation)
@@ -92,4 +98,21 @@ public class WriterValidationUtil {
         }
     }
 
+    private static void validateGzipCompressionLevel(Map<String, String> 
configuration, SourceLocation sourceLocation)
+            throws CompilationException {
+        String compressionLevelStr = 
configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL);
+        if (compressionLevelStr == null) {
+            return;
+        }
+        try {
+            int compressionLevel = Integer.parseInt(compressionLevelStr);
+            if (compressionLevel < 1 || compressionLevel > 9) {
+                throw new CompilationException(INVALID_REQ_PARAM_VAL, 
sourceLocation,
+                        
ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL, 
compressionLevelStr);
+            }
+        } catch (NumberFormatException e) {
+            throw 
CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, 
compressionLevelStr);
+        }
+    }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
index 5ef196d190..a5874874d5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
@@ -22,19 +22,30 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipParameters;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class GzipExternalFileCompressStreamFactory implements 
IExternalFileCompressStreamFactory {
     private static final long serialVersionUID = -7364595253362922025L;
-    public static IExternalFileCompressStreamFactory INSTANCE = new 
GzipExternalFileCompressStreamFactory();
+    private final int compressionLevel;
+    private final int bufferSize;
 
-    private GzipExternalFileCompressStreamFactory() {
+    public static GzipExternalFileCompressStreamFactory create(int 
compressionLevel, int bufferSize) {
+        return new GzipExternalFileCompressStreamFactory(compressionLevel, 
bufferSize);
+    }
+
+    private GzipExternalFileCompressStreamFactory(int compressionLevel, int 
bufferSize) {
+        this.compressionLevel = compressionLevel;
+        this.bufferSize = bufferSize;
     }
 
     @Override
     public OutputStream createStream(OutputStream outputStream) throws 
HyracksDataException {
         try {
-            return new GzipCompressorOutputStream(outputStream);
+            GzipParameters gzipParam = new GzipParameters();
+            gzipParam.setCompressionLevel(compressionLevel);
+            gzipParam.setBufferSize(bufferSize);
+            return new GzipCompressorOutputStream(outputStream, gzipParam);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 416e32f5a0..e861a3fc5f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -761,7 +761,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         fileWriterFactory.validate();
         String fileExtension = ExternalWriterProvider.getFileExtension(sink);
         int maxResult = ExternalWriterProvider.getMaxResult(sink);
-        IExternalPrinterFactory printerFactory = 
ExternalWriterProvider.createPrinter(sink, sourceType);
+        IExternalPrinterFactory printerFactory = 
ExternalWriterProvider.createPrinter(appCtx, sink, sourceType);
         ExternalFileWriterFactory writerFactory = new 
ExternalFileWriterFactory(fileWriterFactory, printerFactory,
                 fileExtension, maxResult, dynamicPathEvalFactory, staticPath, 
pathSourceLocation);
         SinkExternalWriterRuntimeFactory runtime = new 
SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9253a48492..cf5120061f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -20,10 +20,12 @@ package org.apache.asterix.metadata.provider;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.zip.Deflater;
 
 import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
 import 
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -41,7 +43,6 @@ import org.apache.hyracks.control.cc.ClusterControllerService;
 
 public class ExternalWriterProvider {
     private static final Map<String, IExternalFileWriterFactoryProvider> 
CREATOR_MAP;
-    private static final Map<String, IExternalFileCompressStreamFactory> 
STREAM_COMPRESSORS;
 
     private ExternalWriterProvider() {
     }
@@ -50,10 +51,6 @@ public class ExternalWriterProvider {
         CREATOR_MAP = new HashMap<>();
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, 
LocalFSExternalFileWriterFactory.PROVIDER);
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, 
S3ExternalFileWriterFactory.PROVIDER);
-
-        STREAM_COMPRESSORS = new HashMap<>();
-        STREAM_COMPRESSORS.put(ExternalDataConstants.KEY_COMPRESSION_GZIP,
-                GzipExternalFileCompressStreamFactory.INSTANCE);
     }
 
     public static IExternalFileWriterFactory 
createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
@@ -105,7 +102,8 @@ public class ExternalWriterProvider {
         CREATOR_MAP.put(adapterName.toLowerCase(), creator);
     }
 
-    public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, 
Object sourceType) {
+    public static IExternalPrinterFactory createPrinter(ICcApplicationContext 
appCtx, IWriteDataSink sink,
+            Object sourceType) {
         Map<String, String> configuration = sink.getConfiguration();
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
 
@@ -116,8 +114,7 @@ public class ExternalWriterProvider {
 
         String compression = getCompression(configuration);
         IExternalFileCompressStreamFactory compressStreamFactory =
-                STREAM_COMPRESSORS.getOrDefault(compression, 
NoOpExternalFileCompressStreamFactory.INSTANCE);
-
+                createCompressionStreamFactory(appCtx, compression, 
configuration);
         IPrinterFactory printerFactory = 
CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
         return new TextualExternalFilePrinterFactory(printerFactory, 
compressStreamFactory);
     }
@@ -139,4 +136,23 @@ public class ExternalWriterProvider {
 
         return creator.getSeparator();
     }
+
+    private static IExternalFileCompressStreamFactory 
createCompressionStreamFactory(ICcApplicationContext appCtx,
+            String compression, Map<String, String> configuration) {
+        if (ExternalDataUtils.isGzipCompression(compression)) {
+            return createGzipStreamFactory(appCtx, configuration);
+        }
+        return NoOpExternalFileCompressStreamFactory.INSTANCE;
+    }
+
+    private static GzipExternalFileCompressStreamFactory 
createGzipStreamFactory(ICcApplicationContext appCtx,
+            Map<String, String> configuration) {
+        int compressionLevel = Deflater.DEFAULT_COMPRESSION;
+        String gzipCompressionLevel = 
configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL);
+        if (gzipCompressionLevel != null) {
+            compressionLevel = Integer.parseInt(gzipCompressionLevel);
+        }
+        return GzipExternalFileCompressStreamFactory.create(compressionLevel,
+                appCtx.getCompilerProperties().getFrameSize());
+    }
 }

Reply via email to