>From Hussain Towaileb <[email protected]>:
Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18391 )
Change subject: [ASTERIXDB-3440][EXT]: Handle legacy parquet schemas
......................................................................
[ASTERIXDB-3440][EXT]: Handle legacy parquet schemas
Change-Id: I829df08c9ad667562194fbee027f63d8b2b31d4b
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectRepeatedConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
R
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/AbstractComplexConverter.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IClosableRepeatedConverter.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveRepeatedConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
18 files changed, 506 insertions(+), 34 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/91/18391/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
index be9690e..254280d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
@@ -26,8 +26,8 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.DecimalConverter;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.om.types.ARecordType;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/AbstractComplexConverter.java
similarity index 61%
rename from
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
rename to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/AbstractComplexConverter.java
index 24ec4c9..6d9ba5d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/AbstractComplexConverter.java
@@ -16,13 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter;
import java.io.DataOutput;
import java.io.IOException;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.AsterixTypeToParquetTypeVisitor;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.ArrayConverter;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.ObjectConverter;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.ObjectRepeatedConverter;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.RepeatedConverter;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.IClosableRepeatedConverter;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveRepeatedConverter;
+import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.io.api.Converter;
@@ -33,22 +39,23 @@
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.Repetition;
-public abstract class AbstractComplexConverter extends GroupConverter
implements IFieldValue {
- protected final AbstractComplexConverter parent;
- private final IValueReference fieldName;
- private final String stringFieldName;
- private final int index;
- private final Converter[] converters;
- protected final ParquetConverterContext context;
+public abstract class AbstractComplexConverter extends GroupConverter
+ implements IFieldValue, IClosableRepeatedConverter {
+ protected AbstractComplexConverter parent;
+ protected IValueReference fieldName;
+ protected String stringFieldName;
+ private int index;
+ protected Converter[] converters;
+ protected ParquetConverterContext context;
protected IMutableValueStorage tempStorage;
- AbstractComplexConverter(AbstractComplexConverter parent, int index,
GroupType parquetType,
+ protected AbstractComplexConverter(AbstractComplexConverter parent, int
index, GroupType parquetType,
ParquetConverterContext context) throws IOException {
this(parent, null, index, parquetType, context);
}
- AbstractComplexConverter(AbstractComplexConverter parent, String
stringFieldName, int index, GroupType parquetType,
- ParquetConverterContext context) throws IOException {
+ protected AbstractComplexConverter(AbstractComplexConverter parent, String
stringFieldName, int index,
+ GroupType parquetType, ParquetConverterContext context) throws
IOException {
this.parent = parent;
this.stringFieldName = stringFieldName;
this.fieldName = context.getSerializedFieldName(stringFieldName);
@@ -57,14 +64,17 @@
converters = new Converter[parquetType.getFieldCount()];
for (int i = 0; i < parquetType.getFieldCount(); i++) {
final Type type = parquetType.getType(i);
- if (type.isPrimitive()) {
+
+ LogicalTypeAnnotation typeAnnotation =
type.getLogicalTypeAnnotation();
+ if (type.isPrimitive() && type.getRepetition() !=
Repetition.REPEATED) {
converters[i] = createAtomicConverter(parquetType, i);
- } else if
(LogicalTypeAnnotation.listType().equals(type.getLogicalTypeAnnotation())) {
+ } else if
(LogicalTypeAnnotation.listType().equals(typeAnnotation)) {
converters[i] = createArrayConverter(parquetType, i);
- } else if (type.getRepetition() == Repetition.REPEATED) {
- converters[i] = createRepeatedConverter(parquetType, i);
- } else if (type.getLogicalTypeAnnotation() ==
LogicalTypeAnnotation.mapType()) {
+ } else if (LogicalTypeAnnotation.mapType().equals(typeAnnotation)
+ ||
LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance().equals(typeAnnotation))
{
converters[i] = createArrayConverter(parquetType, i);
+ } else if (isRepeated(parquetType, i)) {
+ continue;
} else {
converters[i] = createObjectConverter(parquetType, i);
}
@@ -131,6 +141,35 @@
return new RepeatedConverter(this, index, repeatedType, context);
}
+ private boolean isRepeated(GroupType parquetType, int index) throws
IOException {
+ Type repeatedType = parquetType.getType(index);
+ LogicalTypeAnnotation typeAnnotation =
repeatedType.getLogicalTypeAnnotation();
+
+ if (repeatedType.getRepetition() == Repetition.REPEATED) {
+ // a primitive repeated is a legacy array of type
.getLogicalTypeAnnotation()
+ if (repeatedType.isPrimitive()) {
+ ATypeTag typeTag =
AsterixTypeToParquetTypeVisitor.mapType(repeatedType, context, null);
+ converters[index] = new PrimitiveRepeatedConverter(typeTag,
this, repeatedType.asPrimitiveType(),
+ index, context);
+ return true;
+ }
+
+ // group repeated
+ if (repeatedType.getName() != null && typeAnnotation == null) {
+ GroupType groupType = repeatedType.asGroupType();
+
+ // if contained inside a list, then this is a repeated for the
list
+ if (this instanceof ArrayConverter) {
+ converters[index] = new RepeatedConverter(this, index,
repeatedType.asGroupType(), context);
+ } else {
+ converters[index] = new ObjectRepeatedConverter(this,
groupType.getName(), index, groupType, context);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public String getStringFieldName() {
return stringFieldName;
@@ -171,4 +210,13 @@
}
parent.addValue(this);
}
+
+ /**
+ * This closes non-standard repeated converters, check implementers of
{@link IClosableRepeatedConverter)
+ */
+ protected void closeDirectRepeatedChildren() {
+ for (Converter converter : converters) {
+ ((IClosableRepeatedConverter) converter).internalEnd();
+ }
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
index 89647e0..0fed70b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.asterix.builders.IAsterixListBuilder;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
@@ -31,7 +32,7 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
-class ArrayConverter extends AbstractComplexConverter {
+public class ArrayConverter extends AbstractComplexConverter {
private IAsterixListBuilder builder;
public ArrayConverter(AbstractComplexConverter parent, int index,
GroupType parquetType,
@@ -52,6 +53,7 @@
@Override
public void end() {
+ closeDirectRepeatedChildren();
try {
builder.write(getParentDataOutput(), true);
} catch (IOException e) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
index 542318b..6b63a7b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
@@ -22,6 +22,7 @@
import org.apache.asterix.builders.IARecordBuilder;
import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
@@ -34,7 +35,7 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
-class ObjectConverter extends AbstractComplexConverter {
+public class ObjectConverter extends AbstractComplexConverter {
private IARecordBuilder builder;
/**
* {@link IExternalFilterValueEmbedder} decides whether the object should
be ignored entirely
@@ -66,6 +67,7 @@
@Override
public void end() {
+ closeDirectRepeatedChildren();
if (!ignore) {
writeToParent();
context.getValueEmbedder().exitObject();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectRepeatedConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectRepeatedConverter.java
new file mode 100644
index 0000000..ef944ed
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectRepeatedConverter.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * Handles the following non-standard parquet schema scenario:
+ * a group repeated list that is not contained in a LIST structure, for example
+ *
+ * {
+ * "my_group_list": [{"date": "123", "author": "abc"}, {"date": "456",
"author": "xyz"}]
+ * }
+ *
+ * Represented as:
+ * message Product {
+ * repeated group name=my_group_list {
+ * required binary name=date (STRING);
+ * required binary name=author (STRING);
+ * }
+ * }
+ *
+ * Instead of:
+ * message arrow_schema {
+ * required group myGroupArray (LIST) {
+ * repeated group list {
+ * optional group {
+ * optional binary hello (STRING);
+ * optional binary foo (STRING);
+ * }
+ * }
+ * }
+ * }
+ *
+ * In this case, this is a list and the type of the repeated is the type of
the elements in the list
+ */
+public class ObjectRepeatedConverter extends AbstractComplexConverter {
+ private IAsterixListBuilder listBuilder;
+ private IMutableValueStorage listStorage;
+
+ private IARecordBuilder recordBuilder;
+ /**
+ * {@link IExternalFilterValueEmbedder} decides whether the object should
be ignored entirely
+ */
+ private boolean ignore = false;
+
+ public ObjectRepeatedConverter(AbstractComplexConverter parent, String
stringFieldName, int index,
+ GroupType parquetType, ParquetConverterContext context) throws
IOException {
+ super(parent, stringFieldName, index, parquetType, context);
+ }
+
+ @Override
+ public void start() {
+ tempStorage = context.enterObject();
+ recordBuilder =
context.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ IExternalFilterValueEmbedder valueEmbedder =
context.getValueEmbedder();
+ if (isRoot()) {
+ valueEmbedder.reset();
+ valueEmbedder.enterObject();
+ } else {
+ ignore = checkValueEmbedder(valueEmbedder);
+ }
+ }
+
+ @Override
+ public void end() {
+ closeDirectRepeatedChildren();
+ if (!ignore) {
+ writeToList();
+ context.getValueEmbedder().exitObject();
+ }
+ context.exitObject(tempStorage, null, recordBuilder);
+ tempStorage = null;
+ recordBuilder = null;
+ ignore = false;
+ }
+
+ private void writeToList() {
+ try {
+ finalizeEmbedding();
+ recordBuilder.write(getListDataOutput(), true);
+ addValueToList();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void addValueToList() {
+ try {
+ listBuilder.addItem(listStorage);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public DataOutput getListDataOutput() {
+ if (listStorage == null) {
+ internalStart();
+ }
+ listStorage.reset();
+ return listStorage.getDataOutput();
+ }
+
+ @Override
+ public ATypeTag getTypeTag() {
+ return ATypeTag.ARRAY;
+ }
+
+ private void internalStart() {
+ listStorage = context.enterCollection();
+ listBuilder =
context.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+ }
+
+ @Override
+ public void internalEnd() {
+ closeDirectRepeatedChildren();
+ try {
+ listBuilder.write(parent.getDataOutput(), true);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ parent.addValue(this);
+ context.exitCollection(listStorage, listBuilder);
+ listStorage = null;
+ listBuilder = null;
+ }
+
+ @Override
+ public void addValue(IFieldValue value) {
+ if (ignore) {
+ // The value was embedded already
+ return;
+ }
+ IExternalFilterValueEmbedder valueEmbedder =
context.getValueEmbedder();
+ IValueReference fieldName = value.getFieldName();
+ try {
+ if (valueEmbedder.shouldEmbed(value.getStringFieldName(),
value.getTypeTag())) {
+ recordBuilder.addField(fieldName,
valueEmbedder.getEmbeddedValue());
+ } else {
+ recordBuilder.addField(fieldName, getValue());
+ }
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ protected PrimitiveConverter createAtomicConverter(GroupType type, int
index) {
+ try {
+ PrimitiveType primitiveType =
type.getType(index).asPrimitiveType();
+ String childFieldName = type.getFieldName(index);
+ return
PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this,
childFieldName, index,
+ context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ protected ArrayConverter createArrayConverter(GroupType type, int index) {
+ try {
+ GroupType arrayType = type.getType(index).asGroupType();
+ String childFieldName = type.getFieldName(index);
+ return new ArrayConverter(this, childFieldName, index, arrayType,
context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ protected ObjectConverter createObjectConverter(GroupType type, int index)
{
+ try {
+ GroupType objectType = type.getType(index).asGroupType();
+ String childFieldName = type.getFieldName(index);
+ return new ObjectConverter(this, childFieldName, index,
objectType, context);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected boolean isRoot() {
+ return false;
+ }
+
+ private boolean checkValueEmbedder(IExternalFilterValueEmbedder
valueEmbedder) {
+ boolean embed = valueEmbedder.shouldEmbed(getStringFieldName(),
ATypeTag.OBJECT);
+ if (embed) {
+ ((ArrayBackedValueStorage)
parent.getValue()).set(valueEmbedder.getEmbeddedValue());
+ addValueToList();
+ } else {
+ valueEmbedder.enterObject();
+ }
+ return embed;
+ }
+
+ private void finalizeEmbedding() throws IOException {
+ IExternalFilterValueEmbedder valueEmbedder =
context.getValueEmbedder();
+ if (valueEmbedder.isMissingEmbeddedValues()) {
+ String[] embeddedFieldNames =
valueEmbedder.getEmbeddedFieldNames();
+ for (int i = 0; i < embeddedFieldNames.length; i++) {
+ String embeddedFieldName = embeddedFieldNames[i];
+ if (valueEmbedder.isMissing(embeddedFieldName)) {
+ IValueReference embeddedValue =
valueEmbedder.getEmbeddedValue();
+
recordBuilder.addField(context.getSerializedFieldName(embeddedFieldName),
embeddedValue);
+ }
+ }
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
index 3936ad7..87d0ae4 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
@@ -21,6 +21,7 @@
import java.io.DataOutput;
import java.io.IOException;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
@@ -29,7 +30,7 @@
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveType;
-class RepeatedConverter extends AbstractComplexConverter {
+public class RepeatedConverter extends AbstractComplexConverter {
public RepeatedConverter(AbstractComplexConverter parent, int index,
GroupType parquetType,
ParquetConverterContext context) throws IOException {
super(parent, index, parquetType, context);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
index 4f371c8..c5e7120 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
@@ -20,8 +20,8 @@
import java.io.IOException;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.Binary;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
index 07a5f79..4fd267c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
@@ -20,8 +20,8 @@
import java.io.IOException;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
class DateConverter extends GenericPrimitiveConverter {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
index 81fb36d..343876c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
@@ -23,8 +23,8 @@
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.Binary;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
index 20f82f9..e708313 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
@@ -20,16 +20,16 @@
import java.io.IOException;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.PrimitiveConverter;
-public class GenericPrimitiveConverter extends PrimitiveConverter implements
IFieldValue {
- private final ATypeTag typeTag;
+public class GenericPrimitiveConverter extends PrimitiveConverter implements
IFieldValue, IClosableRepeatedConverter {
+ protected ATypeTag typeTag;
protected final AbstractComplexConverter parent;
protected final String stringFieldName;
protected final IValueReference fieldName;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IClosableRepeatedConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IClosableRepeatedConverter.java
new file mode 100644
index 0000000..b23bbf9
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IClosableRepeatedConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+/**
+ * Special case interface to handle scenarios that are not following the
latest parquet standards. See implementors
+ * of this interface methods for more details.
+ */
+public interface IClosableRepeatedConverter {
+
+ /**
+ * Calls any necessary end operations for repeated types
+ */
+ default void internalEnd() {
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
index 8e4c556..7b106e8 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
@@ -21,8 +21,8 @@
import java.io.DataOutput;
import java.io.IOException;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.external.parser.JSONDataParser;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.asterix.om.types.ATypeTag;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
index 0f36d11..0e16c21 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
@@ -21,8 +21,8 @@
import java.io.IOException;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.AsterixTypeToParquetTypeVisitor;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveRepeatedConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveRepeatedConverter.java
new file mode 100644
index 0000000..a832484
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveRepeatedConverter.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.builders.IAsterixListBuilder;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * Handles the following non-standard parquet schema scenario:
+ * a primitive repeated list that is not contained in a LIST structure, for
example
+ *
+ * {
+ * "my_primitive_array": ["item1", "item2"]
+ * }
+ *
+ * Represented as:
+ * message schema {
+ * repeated binary name=my_primitive_array (STRING);
+ * }
+ *
+ * Instead of:
+ * message schema {
+ * required group my_primitive_array (LIST) {
+ * repeated group list {
+ * optional binary (STRING);
+ * }
+ * }
+ * }
+ *
+ * In this case, this is a list and the type of the repeated is the type of
the elements in the list
+ */
+public class PrimitiveRepeatedConverter extends GenericPrimitiveConverter {
+ private IAsterixListBuilder builder;
+
+ protected IMutableValueStorage tempStorage;
+
+ public PrimitiveRepeatedConverter(ATypeTag typeTag,
AbstractComplexConverter parent, PrimitiveType type, int index,
+ ParquetConverterContext context) throws IOException {
+ super(typeTag, parent, type.getName(), index, context);
+ }
+
+ private void internalStart() {
+ tempStorage = context.enterCollection();
+ builder =
context.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+ }
+
+ @Override
+ public void internalEnd() {
+ try {
+ builder.write(parent.getDataOutput(), true);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ parent.addValue(this);
+ context.exitCollection(tempStorage, builder);
+ tempStorage = null;
+ builder = null;
+ }
+
+ public DataOutput getDataOutput() {
+ if (tempStorage == null) {
+ internalStart();
+ }
+ tempStorage.reset();
+ return tempStorage.getDataOutput();
+ }
+
+ public void addValue() {
+ try {
+ builder.addItem(tempStorage);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ context.serializeString(value, getDataOutput());
+ addValue();
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ context.serializeBoolean(value, getDataOutput());
+ addValue();
+ }
+
+ @Override
+ public void addFloat(float value) {
+ addDouble(value);
+ }
+
+ @Override
+ public void addDouble(double value) {
+ context.serializeDouble(value, getDataOutput());
+ addValue();
+ }
+
+ @Override
+ public void addInt(int value) {
+ addLong(value);
+ }
+
+ @Override
+ public void addLong(long value) {
+ context.serializeInt64(value, getDataOutput());
+ addValue();
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
index c78d6e7..e854a64 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
@@ -21,8 +21,8 @@
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.schema.LogicalTypeAnnotation;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
index c36de37..4d0a61d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
@@ -22,8 +22,8 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.LogicalTypeAnnotation;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
index 23667ca..4384359 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
@@ -20,8 +20,8 @@
import java.io.IOException;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.parquet.io.api.Binary;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
index 5bb4ad7..64b0ad7 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
@@ -21,8 +21,8 @@
import java.io.IOException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.Warning;
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18391
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I829df08c9ad667562194fbee027f63d8b2b31d4b
Gerrit-Change-Number: 18391
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-MessageType: newchange