>From Murtadha Hubail <[email protected]>:

Murtadha Hubail has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204 )


Change subject: [NO ISSUE][EXT] Configurable Gzip compression level
......................................................................

[NO ISSUE][EXT] Configurable Gzip compression level

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Add an option "gzipCompressionLevel" to specify the
  compression level (1-9) to be used when gzip is used
  with COPY TO.
- Default gzip compression level to -1 (library default)
  when not specified.
- Make the compression buffer size equal to a frame size.

Change-Id: I6e80691e232269620d76e6b6f414cff6856f3232
---
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
7 files changed, 91 insertions(+), 13 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/04/18204/1

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
index 6df40c3..1d7253b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/query/query.02.update.sqlpp
@@ -36,7 +36,8 @@
     "serviceEndpoint":"http://127.0.0.1:8001";,
     "container":"playground",
     "format":"json",
-    "compression":"gzip"
+    "compression":"gzip",
+    "gzipCompressionLevel": "1"
 }


diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index fc8f527..79252ad 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -304,6 +304,7 @@
      * Compression constants
      */
     public static final String KEY_COMPRESSION_GZIP = "gzip";
+    public static final String KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL = 
"gzipCompressionLevel";

     /**
      * Writer Constants
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index ae3b567..b02122d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -1085,4 +1085,8 @@
                 return ExternalDataConstants.KEY_PATH;
         }
     }
+
+    public static boolean isGzipCompression(String compression) {
+        return 
ExternalDataConstants.KEY_COMPRESSION_GZIP.equalsIgnoreCase(compression);
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
index 00fe855..b348c1e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/WriterValidationUtil.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.external.util;

+import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_REQ_PARAM_VAL;
+import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_FORMAT;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -59,6 +62,9 @@
         checkSupported(ExternalDataConstants.KEY_WRITER_COMPRESSION, 
compression,
                 ExternalDataConstants.WRITER_SUPPORTED_COMPRESSION, 
ErrorCode.UNKNOWN_COMPRESSION_SCHEME,
                 sourceLocation, true);
+        if (ExternalDataUtils.isGzipCompression(compression)) {
+            validateGzipCompressionLevel(configuration, sourceLocation);
+        }
     }

     private static void validateMaxResult(Map<String, String> configuration, 
SourceLocation sourceLocation)
@@ -92,4 +98,21 @@
         }
     }

+    private static void validateGzipCompressionLevel(Map<String, String> 
configuration, SourceLocation sourceLocation)
+            throws CompilationException {
+        String compressionLevelStr = 
configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL);
+        if (compressionLevelStr == null) {
+            return;
+        }
+        try {
+            int compressionLevel = Integer.parseInt(compressionLevelStr);
+            if (compressionLevel < 1 || compressionLevel > 9) {
+                throw new CompilationException(INVALID_REQ_PARAM_VAL, 
sourceLocation,
+                        
ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL, 
compressionLevelStr);
+            }
+        } catch (NumberFormatException e) {
+            throw 
CompilationException.create(ErrorCode.INTEGER_VALUE_EXPECTED, sourceLocation, 
compressionLevelStr);
+        }
+    }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
index 5ef196d..a587487 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/compressor/GzipExternalFileCompressStreamFactory.java
@@ -22,19 +22,30 @@
 import java.io.OutputStream;

 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipParameters;
 import org.apache.hyracks.api.exceptions.HyracksDataException;

 public class GzipExternalFileCompressStreamFactory implements 
IExternalFileCompressStreamFactory {
     private static final long serialVersionUID = -7364595253362922025L;
-    public static IExternalFileCompressStreamFactory INSTANCE = new 
GzipExternalFileCompressStreamFactory();
+    private final int compressionLevel;
+    private final int bufferSize;

-    private GzipExternalFileCompressStreamFactory() {
+    public static GzipExternalFileCompressStreamFactory create(int 
compressionLevel, int bufferSize) {
+        return new GzipExternalFileCompressStreamFactory(compressionLevel, 
bufferSize);
+    }
+
+    private GzipExternalFileCompressStreamFactory(int compressionLevel, int 
bufferSize) {
+        this.compressionLevel = compressionLevel;
+        this.bufferSize = bufferSize;
     }

     @Override
     public OutputStream createStream(OutputStream outputStream) throws 
HyracksDataException {
         try {
-            return new GzipCompressorOutputStream(outputStream);
+            GzipParameters gzipParam = new GzipParameters();
+            gzipParam.setCompressionLevel(compressionLevel);
+            gzipParam.setBufferSize(bufferSize);
+            return new GzipCompressorOutputStream(outputStream, gzipParam);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 416e32f..e861a3f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -761,7 +761,7 @@
         fileWriterFactory.validate();
         String fileExtension = ExternalWriterProvider.getFileExtension(sink);
         int maxResult = ExternalWriterProvider.getMaxResult(sink);
-        IExternalPrinterFactory printerFactory = 
ExternalWriterProvider.createPrinter(sink, sourceType);
+        IExternalPrinterFactory printerFactory = 
ExternalWriterProvider.createPrinter(appCtx, sink, sourceType);
         ExternalFileWriterFactory writerFactory = new 
ExternalFileWriterFactory(fileWriterFactory, printerFactory,
                 fileExtension, maxResult, dynamicPathEvalFactory, staticPath, 
pathSourceLocation);
         SinkExternalWriterRuntimeFactory runtime = new 
SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9253a48..cf51200 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -20,10 +20,12 @@

 import java.util.HashMap;
 import java.util.Map;
+import java.util.zip.Deflater;

 import org.apache.asterix.cloud.writer.S3ExternalFileWriterFactory;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.writer.LocalFSExternalFileWriterFactory;
 import 
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
@@ -41,7 +43,6 @@

 public class ExternalWriterProvider {
     private static final Map<String, IExternalFileWriterFactoryProvider> 
CREATOR_MAP;
-    private static final Map<String, IExternalFileCompressStreamFactory> 
STREAM_COMPRESSORS;

     private ExternalWriterProvider() {
     }
@@ -50,10 +51,6 @@
         CREATOR_MAP = new HashMap<>();
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_LOCALFS, 
LocalFSExternalFileWriterFactory.PROVIDER);
         addCreator(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3, 
S3ExternalFileWriterFactory.PROVIDER);
-
-        STREAM_COMPRESSORS = new HashMap<>();
-        STREAM_COMPRESSORS.put(ExternalDataConstants.KEY_COMPRESSION_GZIP,
-                GzipExternalFileCompressStreamFactory.INSTANCE);
     }

     public static IExternalFileWriterFactory 
createWriterFactory(ICcApplicationContext appCtx, IWriteDataSink sink,
@@ -105,7 +102,8 @@
         CREATOR_MAP.put(adapterName.toLowerCase(), creator);
     }

-    public static IExternalPrinterFactory createPrinter(IWriteDataSink sink, 
Object sourceType) {
+    public static IExternalPrinterFactory createPrinter(ICcApplicationContext 
appCtx, IWriteDataSink sink,
+            Object sourceType) {
         Map<String, String> configuration = sink.getConfiguration();
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);

@@ -116,8 +114,7 @@

         String compression = getCompression(configuration);
         IExternalFileCompressStreamFactory compressStreamFactory =
-                STREAM_COMPRESSORS.getOrDefault(compression, 
NoOpExternalFileCompressStreamFactory.INSTANCE);
-
+                createCompressionStreamFactory(appCtx, compression, 
configuration);
         IPrinterFactory printerFactory = 
CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
         return new TextualExternalFilePrinterFactory(printerFactory, 
compressStreamFactory);
     }
@@ -139,4 +136,23 @@

         return creator.getSeparator();
     }
+
+    private static IExternalFileCompressStreamFactory 
createCompressionStreamFactory(ICcApplicationContext appCtx,
+            String compression, Map<String, String> configuration) {
+        if (ExternalDataUtils.isGzipCompression(compression)) {
+            return createGzipStreamFactory(appCtx, configuration);
+        }
+        return NoOpExternalFileCompressStreamFactory.INSTANCE;
+    }
+
+    private static GzipExternalFileCompressStreamFactory 
createGzipStreamFactory(ICcApplicationContext appCtx,
+            Map<String, String> configuration) {
+        int compressionLevel = Deflater.DEFAULT_COMPRESSION;
+        String gzipCompressionLevel = 
configuration.get(ExternalDataConstants.KEY_COMPRESSION_GZIP_COMPRESSION_LEVEL);
+        if (gzipCompressionLevel != null) {
+            compressionLevel = Integer.parseInt(gzipCompressionLevel);
+        }
+        return GzipExternalFileCompressStreamFactory.create(compressionLevel,
+                appCtx.getCompilerProperties().getFrameSize());
+    }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18204
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I6e80691e232269620d76e6b6f414cff6856f3232
Gerrit-Change-Number: 18204
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <[email protected]>
Gerrit-MessageType: newchange

Reply via email to