>From <[email protected]>:
[email protected] has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18200 )
Change subject: [WIP] Copy to S3 in parquet format
......................................................................
[WIP] Copy to S3 in parquet format
Change-Id: I108be151cadbb7989d22f126296280964c4b838f
---
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M asterixdb/asterix-om/pom.xml
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitor.java
6 files changed, 413 insertions(+), 3 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/00/18200/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index c0af89f..42db85d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -82,6 +82,7 @@
public static final String KEY_EXPRESSION = "expression";
public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
public static final String KEY_FORMAT = "format";
+ public static final String SCHEMA_FORMAT = "schema";
public static final String KEY_INCLUDE = "include";
public static final String KEY_EXCLUDE = "exclude";
public static final String KEY_QUOTE = "quote";
@@ -316,7 +317,7 @@
public static final Set<String> WRITER_SUPPORTED_COMPRESSION;
static {
- WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE);
+ WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE,
FORMAT_PARQUET);
WRITER_SUPPORTED_ADAPTERS =
Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(),
KEY_ADAPTER_NAME_AWS_S3.toLowerCase());
WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
new file mode 100644
index 0000000..d5f5d8e
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
@@ -0,0 +1,139 @@
+package org.apache.asterix.external.writer.printer;
+
+import static
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.*;
+import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.om.pointables.printer.parquet.ParquetRecordVisitor;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.schema.MessageType;
+
+public class TextualExternalFileParquetPrinter implements IExternalPrinter {
+ private final IPrinter printer;
+ private final IExternalFileCompressStreamFactory compressStreamFactory;
+ private final Object typeInfo;
+ private TextualOutputStreamDelegate delegate;
+ private PrintStream printStream;
+ private String schema;
+ private TestOutputFile testOutputFile;
+ private ParquetWriter<Group> writer;
+ private MessageType msg;
+
+ private ParquetRecordVisitor parquetRecordVisitor;
+
+ public TextualExternalFileParquetPrinter(IPrinter printer,
IExternalFileCompressStreamFactory compressStreamFactory,
+ String schema, Object typeInfo) {
+ this.printer = printer;
+ this.compressStreamFactory = compressStreamFactory;
+ this.schema = schema;
+ this.typeInfo = typeInfo;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.init();
+ }
+
+ @Override
+ public void newStream(OutputStream outputStream) throws
HyracksDataException {
+ if (printStream != null) {
+ close();
+ }
+ delegate = new
TextualOutputStreamDelegate(compressStreamFactory.createStream(outputStream));
+ printStream = new PrintStream(delegate);
+ testOutputFile = new TestOutputFile(printStream);
+ Configuration conf = new Configuration();
+ parquetRecordVisitor = new ParquetRecordVisitor(schema, typeInfo);
+ GroupWriteSupport.setSchema(parquetRecordVisitor.getSchema(), conf);
+ try {
+ writer =
ExampleParquetWriter.builder(testOutputFile).withCompressionCodec(UNCOMPRESSED)
+
.withRowGroupSize(1024).withPageSize(1024).withDictionaryPageSize(512).enableDictionaryEncoding()
+
.withValidation(false).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0).withConf(conf)
+ .build();
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void print(IValueReference value) throws HyracksDataException {
+
+ Group group = parquetRecordVisitor.convertToParquetTuple(value);
+ try {
+ this.writer.write(group);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (this.writer != null) {
+ try {
+ // This should also close printStream.close()
+ this.writer.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ delegate.checkError();
+ delegate = null;
+ }
+ // if (printStream != null) {
+ // printStream.close();
+ // printStream = null;
+ // }
+ }
+
+ private static class TestOutputFile implements OutputFile {
+ private final PrintStream ps;
+
+ private final PositionOutputStream pos;
+
+ public TestOutputFile(PrintStream ps) {
+ this.ps = ps;
+ FSDataOutputStream fs = new FSDataOutputStream(ps, new
FileSystem.Statistics("test"));
+ this.pos = HadoopStreams.wrap(fs);
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) throws
IOException {
+ return pos;
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint)
throws IOException {
+ return pos;
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return false;
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return 33554432L;
+ }
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
new file mode 100644
index 0000000..7c5aa2c
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalFileParquetPrinterFactory extends
TextualExternalPrinterFactory {
+ private static final long serialVersionUID = 8971234908711234L;
+ private final IExternalFileCompressStreamFactory compressStreamFactory;
+ private final String schema;
+
+ Object typeInfo;
+
+ public TextualExternalFileParquetPrinterFactory(IPrinterFactory
printerFactory,
+ IExternalFileCompressStreamFactory compressStreamFactory, String
schema, Object typeInfo) {
+ super(printerFactory);
+ this.compressStreamFactory = compressStreamFactory;
+ this.schema = schema;
+ this.typeInfo = typeInfo;
+ }
+
+ @Override
+ public IExternalPrinter createPrinter() {
+ return new
TextualExternalFileParquetPrinter(printerFactory.createPrinter(),
compressStreamFactory, schema,
+ typeInfo);
+ }
+}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index 9253a48..97f8883 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -28,6 +28,7 @@
import
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
import
org.apache.asterix.external.writer.compressor.NoOpExternalFileCompressStreamFactory;
+import
org.apache.asterix.external.writer.printer.TextualExternalFileParquetPrinterFactory;
import
org.apache.asterix.external.writer.printer.TextualExternalFilePrinterFactory;
import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
@@ -110,7 +111,8 @@
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
// Only JSON is supported for now
- if
(!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)) {
+ if
(!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)
+ &&
!ExternalDataConstants.FORMAT_PARQUET.equalsIgnoreCase(format)) {
throw new UnsupportedOperationException("Unsupported format " +
format);
}
@@ -119,7 +121,21 @@
STREAM_COMPRESSORS.getOrDefault(compression,
NoOpExternalFileCompressStreamFactory.INSTANCE);
IPrinterFactory printerFactory =
CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
- return new TextualExternalFilePrinterFactory(printerFactory,
compressStreamFactory);
+
+ switch (format) {
+ case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
+ return new TextualExternalFilePrinterFactory(printerFactory,
compressStreamFactory);
+ case ExternalDataConstants.FORMAT_PARQUET:
+
+ if
(!configuration.containsKey(ExternalDataConstants.SCHEMA_FORMAT)) {
+ throw new UnsupportedOperationException("Schema not
provided for parquet");
+ }
+ String schema =
configuration.get(ExternalDataConstants.SCHEMA_FORMAT);
+ return new
TextualExternalFileParquetPrinterFactory(printerFactory, compressStreamFactory,
schema,
+ sourceType);
+ default:
+ throw new UnsupportedOperationException("Unsupported format "
+ format);
+ }
}
private static String getFormat(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-om/pom.xml b/asterixdb/asterix-om/pom.xml
index 6db4840..19fbe3a 100644
--- a/asterixdb/asterix-om/pom.xml
+++ b/asterixdb/asterix-om/pom.xml
@@ -160,5 +160,14 @@
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>3.3.6</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitor.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitor.java
new file mode 100644
index 0000000..f1f2df9
--- /dev/null
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitor.java
@@ -0,0 +1,191 @@
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.asterix.dataflow.data.nontagged.printers.json.clean.*;
+import org.apache.asterix.om.pointables.*;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.pointables.visitor.IVisitablePointableVisitor;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+public class ParquetRecordVisitor implements IVisitablePointableVisitor<Void,
Pair<Group, Type>> {
+
+ private final MessageType schema;
+ private final Object typeInfo;
+
+ public static String FlatValuePrinter(IVisitablePointable value) {
+ byte[] b = value.getByteArray();
+ int s = value.getStartOffset(), l = value.getLength();
+ ATypeTag typeTag =
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[s]);
+
+ final String[] ans = { "" };
+ OutputStream os = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ char c = (char) b;
+ ans[0] = ans[0] + c;
+ }
+ };
+ PrintStream ps = new PrintStream(os);
+ try {
+ AObjectPrinterFactory.printFlatValue(typeTag, b, s, l, ps);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+
+ switch (typeTag) {
+ case STRING:
+ return ans[0].substring(1, ans[0].length() - 1);
+
+ default:
+ return ans[0];
+ }
+ }
+
+ public ParquetRecordVisitor(String schemaString, Object typeInfo) throws
HyracksDataException {
+
+ try {
+ this.schema = parseMessageType(schemaString);
+ } catch (Exception e) {
+ throw new HyracksDataException("Illegal Parquet Schema provided",
e);
+ }
+ this.typeInfo = typeInfo;
+ }
+
+ public Group convertToParquetTuple(IValueReference value) throws
HyracksDataException {
+
+ SimpleGroupFactory groupFactory = new SimpleGroupFactory(this.schema);
+ Group group = groupFactory.newGroup();
+
+ schema.getName();
+
+ IAType type = (IAType) typeInfo;
+ if (type.getTypeTag() != ATypeTag.OBJECT) {
+ throw new HyracksDataException("Type Unsupported for parquet
printing");
+ }
+ ARecordType recType = (ARecordType) type;
+ final PointableAllocator allocator = new PointableAllocator();
+ final IAType inputType =
+ recType == null ?
DefaultOpenFieldType.getDefaultOpenFieldType(ATypeTag.OBJECT) : recType;
+ final ARecordVisitablePointable recAccessor =
allocator.allocateRecordValue(inputType);
+ recAccessor.set(value.getByteArray(), value.getStartOffset(),
value.getLength());
+
+ if (recAccessor.getFieldNames().size() == 1
+ &&
FlatValuePrinter(recAccessor.getFieldNames().get(0)).equals(schema.getName())) {
+ visit((ARecordVisitablePointable)
recAccessor.getFieldValues().get(0), new Pair<>(group, schema));
+ } else {
+ visit(recAccessor, new Pair<>(group, schema));
+ }
+
+ return group;
+ }
+
+ @Override
+ public Void visit(AListVisitablePointable accessor, Pair<Group, Type> arg)
throws HyracksDataException {
+ throw new HyracksDataException("LIST not supported yet");
+ }
+
+ @Override
+ public Void visit(ARecordVisitablePointable accessor, Pair<Group, Type>
arg) throws HyracksDataException {
+ List<IVisitablePointable> fieldNames = accessor.getFieldNames();
+ List<IVisitablePointable> values = accessor.getFieldValues();
+ Group group = arg.first;
+ Type type = arg.second;
+
+ for (int i = 0; i < fieldNames.size(); i++) {
+ String columnName = FlatValuePrinter(fieldNames.get(i));
+ GroupType groupType = (GroupType) type;
+ if (values.get(i) instanceof ARecordVisitablePointable) {
+ visit((ARecordVisitablePointable) values.get(i),
+ new Pair<>(group.addGroup(columnName),
groupType.getType(columnName)));
+ } else if (values.get(i) instanceof AFlatValuePointable) {
+ visit((AFlatValuePointable) values.get(i), new Pair<>(group,
groupType.getType(columnName)));
+ } else {
+ throw new HyracksDataException("List not supported yet");
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visit(AFlatValuePointable accessor, Pair<Group, Type> arg)
throws HyracksDataException {
+
+ Group group = arg.first;
+ Type type = arg.second;
+ append(group, type.getName(), accessor);
+ return null;
+ }
+
+ private void append(Group group, String fieldName, IVisitablePointable
value) throws HyracksDataException {
+ byte[] b = value.getByteArray();
+ int s = value.getStartOffset(), l = value.getLength();
+ ATypeTag typeTag =
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[s]);
+
+ final String[] ans = { "" };
+ OutputStream os = new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ char c = (char) b;
+ ans[0] = ans[0] + c;
+ }
+ };
+ PrintStream ps = new PrintStream(os);
+ try {
+ AObjectPrinterFactory.printFlatValue(typeTag, b, s, l, ps);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+
+ switch (typeTag) {
+ case STRING:
+ group.append(fieldName, ans[0].substring(1, ans[0].length() -
1));
+ break;
+ case TINYINT:
+ group.append(fieldName, Integer.parseInt(ans[0]));
+ break;
+ case SMALLINT:
+ group.append(fieldName, Integer.parseInt(ans[0]));
+ break;
+ case INTEGER:
+ group.append(fieldName, Integer.parseInt(ans[0]));
+ break;
+ case BIGINT:
+ group.append(fieldName, Integer.parseInt(ans[0]));
+ break;
+ case NULL:
+ return;
+ case BOOLEAN:
+ group.append(fieldName, Boolean.parseBoolean(ans[0]));
+ break;
+ case FLOAT:
+ group.append(fieldName, Float.parseFloat(ans[0]));
+ break;
+ case DOUBLE:
+ group.append(fieldName, Double.parseDouble(ans[0]));
+ break;
+ default:
+ throw new HyracksDataException("TYPE " + typeTag + "
UNEXPECTED");
+ }
+ }
+
+ public MessageType getSchema() {
+ return schema;
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18200
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I108be151cadbb7989d22f126296280964c4b838f
Gerrit-Change-Number: 18200
Gerrit-PatchSet: 1
Gerrit-Owner: [email protected]
Gerrit-MessageType: newchange