>From <[email protected]>:
[email protected] has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18209 )
Change subject: [WIP] Support COPY TO in parquet
......................................................................
[WIP] Support COPY TO in parquet
Change-Id: I40dc16969e66af09cde04b460f441af666b39d51
---
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
M
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
A
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ObjectWriteSupport.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M asterixdb/asterix-om/pom.xml
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
A
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/AsterixParquetWriter.java
A
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitorUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordLazyVisitor.java
10 files changed, 861 insertions(+), 4 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/09/18209/1
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 1e90f98..fd88b86 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -80,7 +80,7 @@
<compilation-unit name="supported-adapter-format-compression">
<output-dir
compare="Text">supported-adapter-format-compression</output-dir>
<expected-error>ASX1188: Unsupported writing adapter 'AZUREBLOB'.
Supported adapters: [localfs, s3]</expected-error>
- <expected-error>ASX1189: Unsupported writing format 'csv'. Supported
formats: [json]</expected-error>
+ <expected-error>ASX1189: Unsupported writing format 'csv'. Supported
formats: [json, parquet]</expected-error>
<expected-error>ASX1096: Unknown compression scheme rar. Supported
schemes are [gzip]</expected-error>
</compilation-unit>
</test-case>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 79252ad..86c8ef3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -82,6 +82,7 @@
public static final String KEY_EXPRESSION = "expression";
public static final String KEY_LOCAL_SOCKET_PATH = "local-socket-path";
public static final String KEY_FORMAT = "format";
+ public static final String SCHEMA_FORMAT = "schema";
public static final String KEY_INCLUDE = "include";
public static final String KEY_EXCLUDE = "exclude";
public static final String KEY_QUOTE = "quote";
@@ -317,7 +318,7 @@
public static final Set<String> WRITER_SUPPORTED_COMPRESSION;
static {
- WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE);
+ WRITER_SUPPORTED_FORMATS = Set.of(FORMAT_JSON_LOWER_CASE,
FORMAT_PARQUET);
WRITER_SUPPORTED_ADAPTERS =
Set.of(ALIAS_LOCALFS_ADAPTER.toLowerCase(),
KEY_ADAPTER_NAME_AWS_S3.toLowerCase());
WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
new file mode 100644
index 0000000..59783e5
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.writer.printer;
+
+import static
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.om.pointables.printer.parquet.AsterixParquetWriter;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hyracks.algebricks.data.IPrinter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.schema.MessageType;
+
+public class TextualExternalFileParquetPrinter implements IExternalPrinter {
+ private final IPrinter printer;
+ private final IExternalFileCompressStreamFactory compressStreamFactory;
+ private final Object typeInfo;
+ private TextualOutputStreamDelegate delegate;
+ private PrintStream printStream;
+ private String schemaString;
+ private MessageType schema;
+ private TestOutputFile testOutputFile;
+ private ParquetWriter<IValueReference> writer;
+
+ public TextualExternalFileParquetPrinter(IPrinter printer,
IExternalFileCompressStreamFactory compressStreamFactory,
+ String schemaString, Object typeInfo) {
+ this.printer = printer;
+ this.compressStreamFactory = compressStreamFactory;
+ this.schemaString = schemaString;
+ this.typeInfo = typeInfo;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ printer.init();
+ }
+
+ @Override
+ public void newStream(OutputStream outputStream) throws
HyracksDataException {
+ if (printStream != null) {
+ close();
+ }
+ delegate = new
TextualOutputStreamDelegate(compressStreamFactory.createStream(outputStream));
+ printStream = new PrintStream(delegate);
+ testOutputFile = new TestOutputFile(printStream);
+
+ try {
+ this.schema = parseMessageType(schemaString);
+ } catch (Exception e) {
+ throw new HyracksDataException("Illegal Parquet Schema provided",
e);
+ }
+
+ Configuration conf = new Configuration();
+
+ try {
+ writer =
AsterixParquetWriter.builder(testOutputFile).withCompressionCodec(UNCOMPRESSED).withType(schema)
+
.withTypeInfo(typeInfo).withRowGroupSize(1024).withPageSize(1024).withDictionaryPageSize(512)
+ .enableDictionaryEncoding().withValidation(false)
+
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0).withConf(conf).build();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ }
+
+ @Override
+ public void print(IValueReference value) throws HyracksDataException {
+ try {
+ this.writer.write(value);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (this.writer != null) {
+ try {
+ // This should also close printStream.close()
+ this.writer.close();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ delegate.checkError();
+ delegate = null;
+ }
+ }
+
+ private static class TestOutputFile implements OutputFile {
+ private final PrintStream ps;
+
+ private final PositionOutputStream pos;
+
+ public TestOutputFile(PrintStream ps) {
+ this.ps = ps;
+ FSDataOutputStream fs = new FSDataOutputStream(ps, new
FileSystem.Statistics("test"));
+ this.pos = HadoopStreams.wrap(fs);
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) throws
IOException {
+ return pos;
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint)
throws IOException {
+ return pos;
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return false;
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return 33554432L;
+ }
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
new file mode 100644
index 0000000..86fb2a4
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/TextualExternalFileParquetPrinterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.writer.printer;
+
+import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
+import org.apache.asterix.runtime.writer.IExternalPrinter;
+import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
+import org.apache.hyracks.algebricks.data.IPrinterFactory;
+
+public class TextualExternalFileParquetPrinterFactory implements
IExternalPrinterFactory {
+ private static final long serialVersionUID = 8971234908711234L;
+ private final IExternalFileCompressStreamFactory compressStreamFactory;
+ private final String schema;
+ private final IPrinterFactory printerFactory;
+
+ Object typeInfo;
+
+ public TextualExternalFileParquetPrinterFactory(IPrinterFactory
printerFactory,
+ IExternalFileCompressStreamFactory compressStreamFactory, String
schema, Object typeInfo) {
+ this.printerFactory = printerFactory;
+ this.compressStreamFactory = compressStreamFactory;
+ this.schema = schema;
+ this.typeInfo = typeInfo;
+ }
+
+ @Override
+ public IExternalPrinter createPrinter() {
+ return new
TextualExternalFileParquetPrinter(printerFactory.createPrinter(),
compressStreamFactory, schema,
+ typeInfo);
+ }
+}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index cf51200..a39186d 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -30,6 +30,7 @@
import
org.apache.asterix.external.writer.compressor.GzipExternalFileCompressStreamFactory;
import
org.apache.asterix.external.writer.compressor.IExternalFileCompressStreamFactory;
import
org.apache.asterix.external.writer.compressor.NoOpExternalFileCompressStreamFactory;
+import
org.apache.asterix.external.writer.printer.TextualExternalFileParquetPrinterFactory;
import
org.apache.asterix.external.writer.printer.TextualExternalFilePrinterFactory;
import org.apache.asterix.formats.nontagged.CleanJSONPrinterFactoryProvider;
import org.apache.asterix.runtime.writer.ExternalFileWriterConfiguration;
@@ -108,7 +109,8 @@
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
// Only JSON is supported for now
- if
(!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)) {
+ if
(!ExternalDataConstants.FORMAT_JSON_LOWER_CASE.equalsIgnoreCase(format)
+ &&
!ExternalDataConstants.FORMAT_PARQUET.equalsIgnoreCase(format)) {
throw new UnsupportedOperationException("Unsupported format " +
format);
}
@@ -116,7 +118,21 @@
IExternalFileCompressStreamFactory compressStreamFactory =
createCompressionStreamFactory(appCtx, compression,
configuration);
IPrinterFactory printerFactory =
CleanJSONPrinterFactoryProvider.INSTANCE.getPrinterFactory(sourceType);
- return new TextualExternalFilePrinterFactory(printerFactory,
compressStreamFactory);
+
+ switch (format) {
+ case ExternalDataConstants.FORMAT_JSON_LOWER_CASE:
+ return new TextualExternalFilePrinterFactory(printerFactory,
compressStreamFactory);
+ case ExternalDataConstants.FORMAT_PARQUET:
+
+ if
(!configuration.containsKey(ExternalDataConstants.SCHEMA_FORMAT)) {
+ throw new UnsupportedOperationException("Schema not
provided for parquet");
+ }
+ String schema =
configuration.get(ExternalDataConstants.SCHEMA_FORMAT);
+ return new
TextualExternalFileParquetPrinterFactory(printerFactory, compressStreamFactory,
schema,
+ sourceType);
+ default:
+ throw new UnsupportedOperationException("Unsupported format "
+ format);
+ }
}
private static String getFormat(Map<String, String> configuration) {
diff --git a/asterixdb/asterix-om/pom.xml b/asterixdb/asterix-om/pom.xml
index 6db4840..97b3946 100644
--- a/asterixdb/asterix-om/pom.xml
+++ b/asterixdb/asterix-om/pom.xml
@@ -160,5 +160,18 @@
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>3.3.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/AsterixParquetWriter.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/AsterixParquetWriter.java
new file mode 100644
index 0000000..57cc909
--- /dev/null
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/AsterixParquetWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.schema.MessageType;
+
+public class AsterixParquetWriter extends ParquetWriter<IValueReference> {
+ public static Builder builder(Path file) {
+ return new Builder(file);
+ }
+
+ public static Builder builder(OutputFile file) {
+ return new Builder(file);
+ }
+
+ AsterixParquetWriter(Path file, WriteSupport<IValueReference> writeSupport,
+ CompressionCodecName compressionCodecName, int blockSize, int
pageSize, boolean enableDictionary,
+ boolean enableValidation, ParquetProperties.WriterVersion
writerVersion, Configuration conf)
+ throws IOException {
+ super(file, writeSupport, compressionCodecName, blockSize, pageSize,
pageSize, enableDictionary,
+ enableValidation, writerVersion, conf);
+ }
+
+ public static class Builder extends ParquetWriter.Builder<IValueReference,
Builder> {
+ private MessageType type;
+ private Object typeInfo;
+ private Map<String, String> extraMetaData;
+
+ private Builder(Path file) {
+ super(file);
+ this.type = null;
+ this.extraMetaData = new HashMap();
+ }
+
+ private Builder(OutputFile file) {
+ super(file);
+ this.type = null;
+ this.extraMetaData = new HashMap();
+ }
+
+ public Builder withType(MessageType type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder withTypeInfo(Object typeInfo) {
+ this.typeInfo = typeInfo;
+ return this;
+ }
+
+ public Builder withExtraMetaData(Map<String, String> extraMetaData) {
+ this.extraMetaData = extraMetaData;
+ return this;
+ }
+
+ protected Builder self() {
+ return this;
+ }
+
+ protected WriteSupport<IValueReference> getWriteSupport(Configuration
conf) {
+ return new ObjectWriteSupport(this.type, this.typeInfo,
this.extraMetaData);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ObjectWriteSupport.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ObjectWriteSupport.java
new file mode 100644
index 0000000..db88c4c
--- /dev/null
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ObjectWriteSupport.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+
+public class ObjectWriteSupport extends WriteSupport<IValueReference> {
+ private MessageType schema;
+
+ private RecordConsumer recordConsumer;
+ private Map<String, String> extraMetaData;
+ ParquetRecordLazyVisitor parquetRecordLazyVisitor;
+
+ public ObjectWriteSupport(MessageType schema, Object typeInfo, Map<String,
String> extraMetaData) {
+ this.schema = schema;
+ this.extraMetaData = extraMetaData;
+ parquetRecordLazyVisitor = new ParquetRecordLazyVisitor(schema,
typeInfo);
+ }
+
+ public String getName() {
+ return "asterix";
+ }
+
+ public WriteSupport.WriteContext init(Configuration configuration) {
+ return new WriteSupport.WriteContext(this.schema, this.extraMetaData);
+ }
+
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public void write(IValueReference valueReference) {
+ try {
+ parquetRecordLazyVisitor.consumeRecord(valueReference,
recordConsumer, schema);
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordLazyVisitor.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordLazyVisitor.java
new file mode 100644
index 0000000..7f045b5
--- /dev/null
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordLazyVisitor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import static
org.apache.asterix.om.pointables.printer.parquet.ParquetRecordVisitorUtils.*;
+
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+public class ParquetRecordLazyVisitor implements
ILazyVisitablePointableVisitor<Void, Pair<RecordConsumer, Type>> {
+
+ private final MessageType schema;
+ private Object typeInfo;
+
+ public ParquetRecordLazyVisitor(MessageType schema, Object typeInfo) {
+ this.schema = schema;
+ this.typeInfo = typeInfo;
+ }
+
+ public MessageType getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Void visit(RecordLazyVisitablePointable pointable,
Pair<RecordConsumer, Type> arg)
+ throws HyracksDataException {
+ RecordConsumer recordConsumer = arg.first;
+ Type type = arg.second;
+ GroupType groupType = (GroupType) type;
+
+ for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ pointable.nextChild();
+ String columnName = Stringify(pointable.getFieldName());
+ AbstractLazyVisitablePointable child =
pointable.getChildVisitablePointable();
+
+ switch (child.getTypeTag()) {
+ case OBJECT:
+ case ARRAY:
+ case MULTISET:
+ recordConsumer.startField(columnName,
groupType.getFieldIndex(columnName));
+ recordConsumer.startGroup();
+ child.accept(this, new Pair<>(recordConsumer,
groupType.getType(columnName)));
+ recordConsumer.endGroup();
+ recordConsumer.endField(columnName,
groupType.getFieldIndex(columnName));
+ break;
+ default:
+ recordConsumer.startField(columnName,
groupType.getFieldIndex(columnName));
+ child.accept(this, new Pair<>(recordConsumer,
groupType.getType(columnName)));
+ recordConsumer.endField(columnName,
groupType.getFieldIndex(columnName));
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visit(AbstractListLazyVisitablePointable pointable,
Pair<RecordConsumer, Type> arg)
+ throws HyracksDataException {
+ RecordConsumer recordConsumer = arg.first;
+ Type type = arg.second;
+ for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+ pointable.nextChild();
+ GroupType groupType = (GroupType) type;
+ AbstractLazyVisitablePointable child =
pointable.getChildVisitablePointable();
+
+ switch (child.getTypeTag()) {
+ case OBJECT:
+ case ARRAY:
+ case MULTISET:
+ recordConsumer.startField(LIST_FIELD,
groupType.getFieldIndex(LIST_FIELD));
+ recordConsumer.startGroup();
+
+ recordConsumer.startField(ELEMENT_FIELD,
+
groupType.getType(LIST_FIELD).asGroupType().getFieldIndex(ELEMENT_FIELD));
+ recordConsumer.startGroup();
+
+ child.accept(this, new Pair<>(recordConsumer,
+
groupType.getType(LIST_FIELD).asGroupType().getType(ELEMENT_FIELD)));
+
+ recordConsumer.endGroup();
+ recordConsumer.endField(ELEMENT_FIELD,
+
groupType.getType(LIST_FIELD).asGroupType().getFieldIndex(ELEMENT_FIELD));
+
+ recordConsumer.endGroup();
+ recordConsumer.endField(LIST_FIELD,
groupType.getFieldIndex(LIST_FIELD));
+
+ break;
+ default:
+ recordConsumer.startField(LIST_FIELD,
groupType.getFieldIndex(LIST_FIELD));
+ recordConsumer.startField(ELEMENT_FIELD,
+
groupType.getType(LIST_FIELD).asGroupType().getFieldIndex(ELEMENT_FIELD));
+
+ child.accept(this, new Pair<>(recordConsumer,
+
groupType.getType(LIST_FIELD).asGroupType().getType(ELEMENT_FIELD)));
+
+ recordConsumer.endField(ELEMENT_FIELD,
+
groupType.getType(LIST_FIELD).asGroupType().getFieldIndex(ELEMENT_FIELD));
+ recordConsumer.endField(LIST_FIELD,
groupType.getFieldIndex(LIST_FIELD));
+
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Void visit(FlatLazyVisitablePointable pointable,
Pair<RecordConsumer, Type> arg)
+ throws HyracksDataException {
+ RecordConsumer recordConsumer = arg.first;
+ Type type = arg.second;
+ addValueToColumn(recordConsumer, pointable, type.asPrimitiveType());
+ return null;
+ }
+
+ public void consumeRecord(IValueReference valueReference, RecordConsumer
recordConsumer, MessageType schema)
+ throws HyracksDataException {
+
+ IAType type = (IAType) typeInfo;
+ if (type.getTypeTag() != ATypeTag.OBJECT) {
+ throw new HyracksDataException("Type Unsupported for parquet
printing");
+ }
+ ARecordType recType = (ARecordType) type;
+ RecordLazyVisitablePointable rec = new
TypedRecordLazyVisitablePointable(recType);
+
+ rec.set(valueReference);
+ recordConsumer.startMessage();
+ rec.accept(this, new Pair<>(recordConsumer, schema));
+ recordConsumer.endMessage();
+ }
+
+}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitorUtils.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitorUtils.java
new file mode 100644
index 0000000..c2335e0
--- /dev/null
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/parquet/ParquetRecordVisitorUtils.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.om.pointables.printer.parquet;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import
org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import
org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import
org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import
org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import
org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import
org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import
org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ByteArrayAccessibleInputStream;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class ParquetRecordVisitorUtils {
+
+ public static final String LIST_FIELD = "list";
+ public static final String ELEMENT_FIELD = "element";
+
+ public static String Stringify(IValueReference valueReference) throws
HyracksDataException {
+ ByteArrayAccessibleInputStream in = new
ByteArrayAccessibleInputStream(new byte[] {}, 0, 0);
+ DataInputStream dataIn = new DataInputStream(in);
+ UTF8StringReader utf8Reader = new UTF8StringReader();
+
+ in.setContent(valueReference.getByteArray(),
valueReference.getStartOffset(), valueReference.getLength());
+ try {
+ return UTF8StringUtil.readUTF8(dataIn, utf8Reader);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static void addValueToColumn(RecordConsumer recordConsumer,
FlatLazyVisitablePointable pointable,
+ PrimitiveType type) throws HyracksDataException {
+
+ ATypeTag typeTag = pointable.getTypeTag();
+
+ final byte[] b = pointable.getByteArray();
+ final int s, l;
+
+ if (pointable.isTagged()) {
+ s = pointable.getStartOffset() + 1;
+ l = pointable.getLength() - 1;
+ } else {
+ s = pointable.getStartOffset();
+ l = pointable.getLength();
+ }
+
+ VoidPointable voidPointable = VoidPointable.FACTORY.createPointable();
+ voidPointable.set(b, s, l);
+
+ PrimitiveType.PrimitiveTypeName primitiveTypeName =
type.getPrimitiveTypeName();
+
+ switch (typeTag) {
+ case TINYINT:
+ byte tinyIntValue = AInt8SerializerDeserializer.getByte(b, s);
+ switch (primitiveTypeName) {
+ case INT32:
+ recordConsumer.addInteger(tinyIntValue);
+ break;
+ case INT64:
+ recordConsumer.addLong(tinyIntValue);
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(tinyIntValue);
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(tinyIntValue);
+ break;
+ case BOOLEAN:
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ default:
+ throw new HyracksDataException(
+ "Typecast impossible from " + typeTag + " to "
+ primitiveTypeName);
+ }
+ break;
+ case SMALLINT:
+ short smallIntValue = AInt16SerializerDeserializer.getShort(b,
s);
+ switch (primitiveTypeName) {
+ case INT32:
+ recordConsumer.addInteger(smallIntValue);
+ break;
+ case INT64:
+ recordConsumer.addLong(smallIntValue);
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(smallIntValue);
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(smallIntValue);
+ break;
+ case BOOLEAN:
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ default:
+ throw new HyracksDataException(
+ "Typecast impossible from " + typeTag + " to "
+ primitiveTypeName);
+ }
+ break;
+ case INTEGER:
+ int intValue = AInt32SerializerDeserializer.getInt(b, s);
+ switch (primitiveTypeName) {
+ case INT32:
+ recordConsumer.addInteger(intValue);
+ break;
+ case INT64:
+ recordConsumer.addLong(intValue);
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(intValue);
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(intValue);
+ break;
+ case BOOLEAN:
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ default:
+ throw new HyracksDataException(
+ "Typecast impossible from " + typeTag + " to "
+ primitiveTypeName);
+ }
+ break;
+ case BIGINT:
+ long bigIntValue = AInt64SerializerDeserializer.getLong(b, s);
+ switch (primitiveTypeName) {
+ case INT64:
+ recordConsumer.addLong(bigIntValue);
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(bigIntValue);
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(bigIntValue);
+ break;
+ case INT32:
+ case BOOLEAN:
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ default:
+ throw new HyracksDataException(
+ "Typecast impossible from " + typeTag + " to "
+ primitiveTypeName);
+ }
+ break;
+ case FLOAT:
+ float floatValue = AFloatSerializerDeserializer.getFloat(b, s);
+ switch (primitiveTypeName) {
+ case INT32:
+ recordConsumer.addInteger((int) floatValue);
+ break;
+ case INT64:
+ recordConsumer.addLong((long) floatValue);
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(floatValue);
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(floatValue);
+ break;
+ case BOOLEAN:
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ default:
+ throw new HyracksDataException(
+ "Typecast impossible from " + typeTag + " to "
+ primitiveTypeName);
+ }
+ break;
+ case DOUBLE:
+ double doubleValue =
ADoubleSerializerDeserializer.getDouble(b, s);
+ switch (primitiveTypeName) {
+ case INT32:
+ recordConsumer.addInteger((int) doubleValue);
+ break;
+ case INT64:
+ recordConsumer.addLong((long) doubleValue);
+ break;
+ case FLOAT:
+ recordConsumer.addFloat((float) doubleValue);
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(doubleValue);
+ break;
+ case BOOLEAN:
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ default:
+ throw new HyracksDataException(
+ "Typecast impossible from " + typeTag + " to "
+ primitiveTypeName);
+ }
+ break;
+ case STRING:
+ String stringValue = Stringify(voidPointable);
+ switch (primitiveTypeName) {
+ case BINARY:
+
recordConsumer.addBinary(Binary.fromString(stringValue));
+ break;
+ case BOOLEAN:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ default:
+ throw new HyracksDataException(
+ "Typecast impossible from " + typeTag + " to "
+ primitiveTypeName);
+ }
+ break;
+ case BOOLEAN:
+ boolean booleanValue =
ABooleanSerializerDeserializer.getBoolean(b, s);
+ switch (primitiveTypeName) {
+ case BOOLEAN:
+ recordConsumer.addBoolean(booleanValue);
+ break;
+ case BINARY:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ default:
+ throw new HyracksDataException(
+ "Typecast impossible from " + typeTag + " to "
+ primitiveTypeName);
+ }
+ break;
+ case DATE:
+ case TIME:
+ case DURATION:
+ case POINT:
+ case POINT3D:
+ case ARRAY:
+ case MULTISET:
+ case OBJECT:
+ case SPARSOBJECT:
+ case UNION:
+ case ENUM:
+ case TYPE:
+ case ANY:
+ case LINE:
+ case POLYGON:
+ case CIRCLE:
+ case RECTANGLE:
+ case INTERVAL:
+ case SYSTEM_NULL:
+ case YEARMONTHDURATION:
+ case DAYTIMEDURATION:
+ case UUID:
+ case SHORTWITHOUTTYPEINFO:
+ case NULL:
+ case GEOMETRY:
+ case BINARY:
+ case UINT8:
+ case UINT16:
+ case UINT32:
+ case UINT64:
+ case BITARRAY:
+ case MISSING:
+ case DATETIME:
+ default:
+ throw new HyracksDataException("TYPE " + typeTag + "
UNEXPECTED");
+ }
+
+ }
+
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18209
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I40dc16969e66af09cde04b460f441af666b39d51
Gerrit-Change-Number: 18209
Gerrit-PatchSet: 1
Gerrit-Owner: [email protected]
Gerrit-MessageType: newchange