>From <[email protected]>:

[email protected] has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18200 )


Change subject: [WIP] Copy to S3 in parquet format
......................................................................

[WIP] Copy to S3 in parquet format

Change-Id: I108be151cadbb7989d22f126296280964c4b838f
---
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M asterixdb/asterix-om/pom.xml
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitor.java
6 files changed, 413 insertions(+), 3 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/00/18200/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index c0af89f..42db85d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -82,6 +82,7 @@
     public static final String KEY_EXPRESSION = "expression";
     public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
     public static final String KEY_FORMAT = "format";
+    public static final String SCHEMA_FORMAT = "schema";
     public static final String KEY_INCLUDE = "include";
     public static final String KEY_EXCLUDE = "exclude";
     public static final String KEY_QUOTE = "quote";
@@ -316,7 +317,7 @@
     public static final Set<String> WRITER_SUPPORTED_COMPRESSION;

     static {
-        WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE);
+        WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE, 
FORMAT_PARQUET);
         WRITER_SUPPORTED_ADAPTERS = 
Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(), 
KEY_ADAPTER_NAME_AWS_S3.toLowerCase());
         WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
new file mode 100644
index 0000000..d5f5d8e
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
@@ -0,0 +1,139 @@
+package org.apache.asterix.external.writer.printer;
+
+import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.*;
+import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.om.pointables.printer.parquet.ParquetRecordVisitor;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.schema.MessageType;
+
+public class TextualExternalFileParquetPrinter implements IExternalPrinter {
+    private final IPrinter printer;
+    private final IExternalFileCompressStreamFactory compressStreamFactory;
+    private final Object typeInfo;
+    private TextualOutputStreamDelegate delegate;
+    private PrintStream printStream;
+    private String schema;
+    private TestOutputFile testOutputFile;
+    private ParquetWriter<Group> writer;
+    private MessageType msg;
+
+    private ParquetRecordVisitor parquetRecordVisitor;
+
+    public TextualExternalFileParquetPrinter(IPrinter printer, 
IExternalFileCompressStreamFactory compressStreamFactory,
+            String schema, Object typeInfo) {
+        this.printer = printer;
+        this.compressStreamFactory = compressStreamFactory;
+        this.schema = schema;
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        printer.init();
+    }
+
+    @Override
+    public void newStream(OutputStream outputStream) throws 
HyracksDataException {
+        if (printStream != null) {
+            close();
+        }
+        delegate = new 
TextualOutputStreamDelegate(compressStreamFactory.createStream(outputStream));
+        printStream = new PrintStream(delegate);
+        testOutputFile = new TestOutputFile(printStream);
+        Configuration conf = new Configuration();
+        parquetRecordVisitor = new ParquetRecordVisitor(schema, typeInfo);
+        GroupWriteSupport.setSchema(parquetRecordVisitor.getSchema(), conf);
+        try {
+            writer = 
ExampleParquetWriter.builder(testOutputFile).withCompressionCodec(UNCOMPRESSED)
+                    
.withRowGroupSize(1024).withPageSize(1024).withDictionaryPageSize(512).enableDictionaryEncoding()
+                    
.withValidation(false).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0).withConf(conf)
+                    .build();
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    @Override
+    public void print(IValueReference value) throws HyracksDataException {
+
+        Group group = parquetRecordVisitor.convertToParquetTuple(value);
+        try {
+            this.writer.write(group);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (this.writer != null) {
+            try {
+                // This should also close printStream.close()
+                this.writer.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            delegate.checkError();
+            delegate = null;
+        }
+        //        if (printStream != null) {
+        //            printStream.close();
+        //            printStream = null;
+        //        }
+    }
+
+    private static class TestOutputFile implements OutputFile {
+        private final PrintStream ps;
+
+        private final PositionOutputStream pos;
+
+        public TestOutputFile(PrintStream ps) {
+            this.ps = ps;
+            FSDataOutputStream fs = new FSDataOutputStream(ps, new 
FileSystem.Statistics("test"));
+            this.pos = HadoopStreams.wrap(fs);
+        }
+
+        @Override
+        public PositionOutputStream create(long blockSizeHint) throws 
IOException {
+            return pos;
+        }
+
+        @Override
+        public PositionOutputStream createOrOverwrite(long blockSizeHint) 
throws IOException {
+            return pos;
+        }
+
+        @Override
+        public boolean supportsBlockSize() {
+            return false;
+        }
+
+        @Override
+        public long defaultBlockSize() {
+            return 33554432L;
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
new file mode 100644
index 0000000..7c5aa2c
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalFileParquetPrinterFactory extends 
TextualExternalPrinterFactory {
+    private static final long serialVersionUID = 8971234908711234L;
+    private final IExternalFileCompressStreamFactory compressStreamFactory;
+    private final String schema;
+
+    Object typeInfo;
+
+    public TextualExternalFileParquetPrinterFactory(IPrinterFactory 
printerFactory,
+            IExternalFileCompressStreamFactory compressStreamFactory, String 
schema, Object typeInfo) {
+        super(printerFactory);
+        this.compressStreamFactory = compressStreamFactory;
+        this.schema = schema;
+        this.typeInfo = typeInfo;
+    }
+
+    @Override
+    public IExternalPrinter createPrinter() {
+        return new 
TextualExternalFileParquetPrinter(printerFactory.createPrinter(), 
compressStreamFactory, schema,
+                typeInfo);
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9253a48..97f8883 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -28,6 +28,7 @@
 import 
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
 import 
org.apache.asterix.external.writer.compressor.NoOpExternalFileCompressStreamFactory;
+import 
org.apache.asterix.external.writer.printer.TextualExternalFileParquetPrinterFactory;
 import 
org.apache.asterix.external.writer.printer.TextualExternalFilePrinterFactory;
 import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
 import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
@@ -110,7 +111,8 @@
         String format = configuration.get(ExternalDataConstants.KEY_FORMAT);

         // Only JSON is supported for now
-        if 
(!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)) {
+        if 
(!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)
+                && 
!ExternalDataConstants.FORMAT_PARQUET.equalsIgnoreCase(format)) {
             throw new UnsupportedOperationException("Unsupported format " + 
format);
         }

@@ -119,7 +121,21 @@
                 STREAM_COMPRESSORS.getOrDefault(compression, 
NoOpExternalFileCompressStreamFactory.INSTANCE);

         IPrinterFactory printerFactory = 
CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
-        return new TextualExternalFilePrinterFactory(printerFactory, 
compressStreamFactory);
+
+        switch (format) {
+            case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
+                return new TextualExternalFilePrinterFactory(printerFactory, 
compressStreamFactory);
+            case ExternalDataConstants.FORMAT_PARQUET:
+
+                if 
(!configuration.containsKey(ExternalDataConstants.SCHEMA_FORMAT)) {
+                    throw new UnsupportedOperationException("Schema not 
provided for parquet");
+                }
+                String schema = 
configuration.get(ExternalDataConstants.SCHEMA_FORMAT);
+                return new 
TextualExternalFileParquetPrinterFactory(printerFactory, compressStreamFactory, 
schema,
+                        sourceType);
+            default:
+                throw new UnsupportedOperationException("Unsupported format " 
+ format);
+        }
     }

     private static String getFormat(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-om/pom.xml b/asterixdb/asterix-om/pom.xml
index 6db4840..19fbe3a 100644
--- a/asterixdb/asterix-om/pom.xml
+++ b/asterixdb/asterix-om/pom.xml
@@ -160,5 +160,14 @@
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil</artifactId>
     </dependency>
+      <dependency>
+          <groupId>org.apache.parquet</groupId>
+          <artifactId>parquet-column</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>3.3.6</version>
+      </dependency>
   </dependencies>
 </project>
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitor.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitor.java
new file mode 100644
index 0000000..f1f2df9
--- /dev/null
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitor.java
@@ -0,0 +1,191 @@
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.*;
+import org.apache.asterix.om.pointables.*;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.visitor.IVisitablePointableVisitor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+public class ParquetRecordVisitor implements IVisitablePointableVisitor<Void, 
Pair<Group, Type>> {
+
+    private final MessageType schema;
+    private final Object typeInfo;
+
+    public static String FlatValuePrinter(IVisitablePointable value) {
+        byte[] b = value.getByteArray();
+        int s = value.getStartOffset(), l = value.getLength();
+        ATypeTag typeTag = 
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[s]);
+
+        final String[] ans = { "" };
+        OutputStream os = new OutputStream() {
+            @Override
+            public void write(int b) throws IOException {
+                char c = (char) b;
+                ans[0] = ans[0] + c;
+            }
+        };
+        PrintStream ps = new PrintStream(os);
+        try {
+            AObjectPrinterFactory.printFlatValue(typeTag, b, s, l, ps);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+
+        switch (typeTag) {
+            case STRING:
+                return ans[0].substring(1, ans[0].length() - 1);
+
+            default:
+                return ans[0];
+        }
+    }
+
+    public ParquetRecordVisitor(String schemaString, Object typeInfo) throws 
HyracksDataException {
+
+        try {
+            this.schema = parseMessageType(schemaString);
+        } catch (Exception e) {
+            throw new HyracksDataException("Illegal Parquet Schema provided", 
e);
+        }
+        this.typeInfo = typeInfo;
+    }
+
+    public Group convertToParquetTuple(IValueReference value) throws 
HyracksDataException {
+
+        SimpleGroupFactory groupFactory = new SimpleGroupFactory(this.schema);
+        Group group = groupFactory.newGroup();
+
+        schema.getName();
+
+        IAType type = (IAType) typeInfo;
+        if (type.getTypeTag() != ATypeTag.OBJECT) {
+            throw new HyracksDataException("Type Unsupported for parquet 
printing");
+        }
+        ARecordType recType = (ARecordType) type;
+        final PointableAllocator allocator = new PointableAllocator();
+        final IAType inputType =
+                recType == null ? 
DefaultOpenFieldType.getDefaultOpenFieldType(ATypeTag.OBJECT) : recType;
+        final ARecordVisitablePointable recAccessor = 
allocator.allocateRecordValue(inputType);
+        recAccessor.set(value.getByteArray(), value.getStartOffset(), 
value.getLength());
+
+        if (recAccessor.getFieldNames().size() == 1
+                && 
FlatValuePrinter(recAccessor.getFieldNames().get(0)).equals(schema.getName())) {
+            visit((ARecordVisitablePointable) 
recAccessor.getFieldValues().get(0), new Pair<>(group, schema));
+        } else {
+            visit(recAccessor, new Pair<>(group, schema));
+        }
+
+        return group;
+    }
+
+    @Override
+    public Void visit(AListVisitablePointable accessor, Pair<Group, Type> arg) 
throws HyracksDataException {
+        throw new HyracksDataException("LIST not supported yet");
+    }
+
+    @Override
+    public Void visit(ARecordVisitablePointable accessor, Pair<Group, Type> 
arg) throws HyracksDataException {
+        List<IVisitablePointable> fieldNames = accessor.getFieldNames();
+        List<IVisitablePointable> values = accessor.getFieldValues();
+        Group group = arg.first;
+        Type type = arg.second;
+
+        for (int i = 0; i < fieldNames.size(); i++) {
+            String columnName = FlatValuePrinter(fieldNames.get(i));
+            GroupType groupType = (GroupType) type;
+            if (values.get(i) instanceof ARecordVisitablePointable) {
+                visit((ARecordVisitablePointable) values.get(i),
+                        new Pair<>(group.addGroup(columnName), 
groupType.getType(columnName)));
+            } else if (values.get(i) instanceof AFlatValuePointable) {
+                visit((AFlatValuePointable) values.get(i), new Pair<>(group, 
groupType.getType(columnName)));
+            } else {
+                throw new HyracksDataException("List not supported yet");
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visit(AFlatValuePointable accessor, Pair<Group, Type> arg) 
throws HyracksDataException {
+
+        Group group = arg.first;
+        Type type = arg.second;
+        append(group, type.getName(), accessor);
+        return null;
+    }
+
+    private void append(Group group, String fieldName, IVisitablePointable 
value) throws HyracksDataException {
+        byte[] b = value.getByteArray();
+        int s = value.getStartOffset(), l = value.getLength();
+        ATypeTag typeTag = 
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[s]);
+
+        final String[] ans = { "" };
+        OutputStream os = new OutputStream() {
+            @Override
+            public void write(int b) throws IOException {
+                char c = (char) b;
+                ans[0] = ans[0] + c;
+            }
+        };
+        PrintStream ps = new PrintStream(os);
+        try {
+            AObjectPrinterFactory.printFlatValue(typeTag, b, s, l, ps);
+        } catch (HyracksDataException e) {
+            throw new RuntimeException(e);
+        }
+
+        switch (typeTag) {
+            case STRING:
+                group.append(fieldName, ans[0].substring(1, ans[0].length() - 
1));
+                break;
+            case TINYINT:
+                group.append(fieldName, Integer.parseInt(ans[0]));
+                break;
+            case SMALLINT:
+                group.append(fieldName, Integer.parseInt(ans[0]));
+                break;
+            case INTEGER:
+                group.append(fieldName, Integer.parseInt(ans[0]));
+                break;
+            case BIGINT:
+                group.append(fieldName, Integer.parseInt(ans[0]));
+                break;
+            case NULL:
+                return;
+            case BOOLEAN:
+                group.append(fieldName, Boolean.parseBoolean(ans[0]));
+                break;
+            case FLOAT:
+                group.append(fieldName, Float.parseFloat(ans[0]));
+                break;
+            case DOUBLE:
+                group.append(fieldName, Double.parseDouble(ans[0]));
+                break;
+            default:
+                throw new HyracksDataException("TYPE " + typeTag + " 
UNEXPECTED");
+        }
+    }
+
+    public MessageType getSchema() {
+        return schema;
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18200
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I108be151cadbb7989d22f126296280964c4b838f
Gerrit-Change-Number: 18200
Gerrit-PatchSet: 1
Gerrit-Owner: [email protected]
Gerrit-MessageType: newchange

Reply via email to