>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18391 )


Change subject: [ASTERIXDB-3440][EXT]: Handle legacy parquet schemas
......................................................................

[ASTERIXDB-3440][EXT]: Handle legacy parquet schemas

Change-Id: I829df08c9ad667562194fbee027f63d8b2b31d4b
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectRepeatedConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
R 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/AbstractComplexConverter.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IClosableRepeatedConverter.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveRepeatedConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
18 files changed, 506 insertions(+), 34 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/91/18391/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
index be9690e..254280d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
@@ -26,8 +26,8 @@

 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.DecimalConverter;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
 import org.apache.asterix.om.types.ARecordType;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/AbstractComplexConverter.java
similarity index 61%
rename from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
rename to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/AbstractComplexConverter.java
index 24ec4c9..6d9ba5d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/AbstractComplexConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/AbstractComplexConverter.java
@@ -16,13 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
+package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter;

 import java.io.DataOutput;
 import java.io.IOException;

-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.AsterixTypeToParquetTypeVisitor;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.ArrayConverter;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.ObjectConverter;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.ObjectRepeatedConverter;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.RepeatedConverter;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.IClosableRepeatedConverter;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveRepeatedConverter;
+import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.parquet.io.api.Converter;
@@ -33,22 +39,23 @@
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;

-public abstract class AbstractComplexConverter extends GroupConverter 
implements IFieldValue {
-    protected final AbstractComplexConverter parent;
-    private final IValueReference fieldName;
-    private final String stringFieldName;
-    private final int index;
-    private final Converter[] converters;
-    protected final ParquetConverterContext context;
+public abstract class AbstractComplexConverter extends GroupConverter
+        implements IFieldValue, IClosableRepeatedConverter {
+    protected AbstractComplexConverter parent;
+    protected IValueReference fieldName;
+    protected String stringFieldName;
+    private int index;
+    protected Converter[] converters;
+    protected ParquetConverterContext context;
     protected IMutableValueStorage tempStorage;

-    AbstractComplexConverter(AbstractComplexConverter parent, int index, 
GroupType parquetType,
+    protected AbstractComplexConverter(AbstractComplexConverter parent, int 
index, GroupType parquetType,
             ParquetConverterContext context) throws IOException {
         this(parent, null, index, parquetType, context);
     }

-    AbstractComplexConverter(AbstractComplexConverter parent, String 
stringFieldName, int index, GroupType parquetType,
-            ParquetConverterContext context) throws IOException {
+    protected AbstractComplexConverter(AbstractComplexConverter parent, String 
stringFieldName, int index,
+            GroupType parquetType, ParquetConverterContext context) throws 
IOException {
         this.parent = parent;
         this.stringFieldName = stringFieldName;
         this.fieldName = context.getSerializedFieldName(stringFieldName);
@@ -57,14 +64,17 @@
         converters = new Converter[parquetType.getFieldCount()];
         for (int i = 0; i < parquetType.getFieldCount(); i++) {
             final Type type = parquetType.getType(i);
-            if (type.isPrimitive()) {
+
+            LogicalTypeAnnotation typeAnnotation = 
type.getLogicalTypeAnnotation();
+            if (type.isPrimitive() && type.getRepetition() != 
Repetition.REPEATED) {
                 converters[i] = createAtomicConverter(parquetType, i);
-            } else if 
(LogicalTypeAnnotation.listType().equals(type.getLogicalTypeAnnotation())) {
+            } else if 
(LogicalTypeAnnotation.listType().equals(typeAnnotation)) {
                 converters[i] = createArrayConverter(parquetType, i);
-            } else if (type.getRepetition() == Repetition.REPEATED) {
-                converters[i] = createRepeatedConverter(parquetType, i);
-            } else if (type.getLogicalTypeAnnotation() == 
LogicalTypeAnnotation.mapType()) {
+            } else if (LogicalTypeAnnotation.mapType().equals(typeAnnotation)
+                    || 
LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance().equals(typeAnnotation))
 {
                 converters[i] = createArrayConverter(parquetType, i);
+            } else if (isRepeated(parquetType, i)) {
+                continue;
             } else {
                 converters[i] = createObjectConverter(parquetType, i);
             }
@@ -131,6 +141,35 @@
         return new RepeatedConverter(this, index, repeatedType, context);
     }

+    private boolean isRepeated(GroupType parquetType, int index) throws 
IOException {
+        Type repeatedType = parquetType.getType(index);
+        LogicalTypeAnnotation typeAnnotation = 
repeatedType.getLogicalTypeAnnotation();
+
+        if (repeatedType.getRepetition() == Repetition.REPEATED) {
+            // a primitive repeated is a legacy array of type 
.getLogicalTypeAnnotation()
+            if (repeatedType.isPrimitive()) {
+                ATypeTag typeTag = 
AsterixTypeToParquetTypeVisitor.mapType(repeatedType, context, null);
+                converters[index] = new PrimitiveRepeatedConverter(typeTag, 
this, repeatedType.asPrimitiveType(),
+                        index, context);
+                return true;
+            }
+
+            // group repeated
+            if (repeatedType.getName() != null && typeAnnotation == null) {
+                GroupType groupType = repeatedType.asGroupType();
+
+                // if contained inside a list, then this is a repeated for the 
list
+                if (this instanceof ArrayConverter) {
+                    converters[index] = new RepeatedConverter(this, index, 
repeatedType.asGroupType(), context);
+                } else {
+                    converters[index] = new ObjectRepeatedConverter(this, 
groupType.getName(), index, groupType, context);
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Override
     public String getStringFieldName() {
         return stringFieldName;
@@ -171,4 +210,13 @@
         }
         parent.addValue(this);
     }
+
+    /**
+     * This closes non-standard repeated converters, check implementers of 
{@link IClosableRepeatedConverter)
+     */
+    protected void closeDirectRepeatedChildren() {
+        for (Converter converter : converters) {
+            ((IClosableRepeatedConverter) converter).internalEnd();
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
index 89647e0..0fed70b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ArrayConverter.java
@@ -21,6 +21,7 @@
 import java.io.IOException;

 import org.apache.asterix.builders.IAsterixListBuilder;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
@@ -31,7 +32,7 @@
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.PrimitiveType;

-class ArrayConverter extends AbstractComplexConverter {
+public class ArrayConverter extends AbstractComplexConverter {
     private IAsterixListBuilder builder;

     public ArrayConverter(AbstractComplexConverter parent, int index, 
GroupType parquetType,
@@ -52,6 +53,7 @@

     @Override
     public void end() {
+        closeDirectRepeatedChildren();
         try {
             builder.write(getParentDataOutput(), true);
         } catch (IOException e) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
index 542318b..6b63a7b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectConverter.java
@@ -22,6 +22,7 @@

 import org.apache.asterix.builders.IARecordBuilder;
 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
@@ -34,7 +35,7 @@
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.PrimitiveType;

-class ObjectConverter extends AbstractComplexConverter {
+public class ObjectConverter extends AbstractComplexConverter {
     private IARecordBuilder builder;
     /**
      * {@link IExternalFilterValueEmbedder} decides whether the object should 
be ignored entirely
@@ -66,6 +67,7 @@

     @Override
     public void end() {
+        closeDirectRepeatedChildren();
         if (!ignore) {
             writeToParent();
             context.getValueEmbedder().exitObject();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectRepeatedConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectRepeatedConverter.java
new file mode 100644
index 0000000..ef944ed
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/ObjectRepeatedConverter.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * Handles the following non-standard parquet schema scenario:
+ * a group repeated list that is not contained in a LIST structure, for example
+ *
+ * {
+ *     "my_group_list": [{"date": "123", "author": "abc"}, {"date": "456", 
"author": "xyz"}]
+ * }
+ *
+ * Represented as:
+ * message Product {
+ *   repeated group name=my_group_list {
+ *     required binary name=date (STRING);
+ *     required binary name=author (STRING);
+ *   }
+ * }
+ *
+ * Instead of:
+ * message arrow_schema {
+ *   required group myGroupArray (LIST) {
+ *     repeated group list {
+ *       optional group  {
+ *         optional binary hello (STRING);
+ *         optional binary foo (STRING);
+ *       }
+ *     }
+ *   }
+ * }
+ *
+ *  In this case, this is a list and the type of the repeated is the type of 
the elements in the list
+ */
+public class ObjectRepeatedConverter extends AbstractComplexConverter {
+    private IAsterixListBuilder listBuilder;
+    private IMutableValueStorage listStorage;
+
+    private IARecordBuilder recordBuilder;
+    /**
+     * {@link IExternalFilterValueEmbedder} decides whether the object should 
be ignored entirely
+     */
+    private boolean ignore = false;
+
+    public ObjectRepeatedConverter(AbstractComplexConverter parent, String 
stringFieldName, int index,
+            GroupType parquetType, ParquetConverterContext context) throws 
IOException {
+        super(parent, stringFieldName, index, parquetType, context);
+    }
+
+    @Override
+    public void start() {
+        tempStorage = context.enterObject();
+        recordBuilder = 
context.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        IExternalFilterValueEmbedder valueEmbedder = 
context.getValueEmbedder();
+        if (isRoot()) {
+            valueEmbedder.reset();
+            valueEmbedder.enterObject();
+        } else {
+            ignore = checkValueEmbedder(valueEmbedder);
+        }
+    }
+
+    @Override
+    public void end() {
+        closeDirectRepeatedChildren();
+        if (!ignore) {
+            writeToList();
+            context.getValueEmbedder().exitObject();
+        }
+        context.exitObject(tempStorage, null, recordBuilder);
+        tempStorage = null;
+        recordBuilder = null;
+        ignore = false;
+    }
+
+    private void writeToList() {
+        try {
+            finalizeEmbedding();
+            recordBuilder.write(getListDataOutput(), true);
+            addValueToList();
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private void addValueToList() {
+        try {
+            listBuilder.addItem(listStorage);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public DataOutput getListDataOutput() {
+        if (listStorage == null) {
+            internalStart();
+        }
+        listStorage.reset();
+        return listStorage.getDataOutput();
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.ARRAY;
+    }
+
+    private void internalStart() {
+        listStorage = context.enterCollection();
+        listBuilder = 
context.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+    }
+
+    @Override
+    public void internalEnd() {
+        closeDirectRepeatedChildren();
+        try {
+            listBuilder.write(parent.getDataOutput(), true);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        parent.addValue(this);
+        context.exitCollection(listStorage, listBuilder);
+        listStorage = null;
+        listBuilder = null;
+    }
+
+    @Override
+    public void addValue(IFieldValue value) {
+        if (ignore) {
+            // The value was embedded already
+            return;
+        }
+        IExternalFilterValueEmbedder valueEmbedder = 
context.getValueEmbedder();
+        IValueReference fieldName = value.getFieldName();
+        try {
+            if (valueEmbedder.shouldEmbed(value.getStringFieldName(), 
value.getTypeTag())) {
+                recordBuilder.addField(fieldName, 
valueEmbedder.getEmbeddedValue());
+            } else {
+                recordBuilder.addField(fieldName, getValue());
+            }
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    protected PrimitiveConverter createAtomicConverter(GroupType type, int 
index) {
+        try {
+            PrimitiveType primitiveType = 
type.getType(index).asPrimitiveType();
+            String childFieldName = type.getFieldName(index);
+            return 
PrimitiveConverterProvider.createPrimitiveConverter(primitiveType, this, 
childFieldName, index,
+                    context);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    protected ArrayConverter createArrayConverter(GroupType type, int index) {
+        try {
+            GroupType arrayType = type.getType(index).asGroupType();
+            String childFieldName = type.getFieldName(index);
+            return new ArrayConverter(this, childFieldName, index, arrayType, 
context);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    protected ObjectConverter createObjectConverter(GroupType type, int index) 
{
+        try {
+            GroupType objectType = type.getType(index).asGroupType();
+            String childFieldName = type.getFieldName(index);
+            return new ObjectConverter(this, childFieldName, index, 
objectType, context);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    protected boolean isRoot() {
+        return false;
+    }
+
+    private boolean checkValueEmbedder(IExternalFilterValueEmbedder 
valueEmbedder) {
+        boolean embed = valueEmbedder.shouldEmbed(getStringFieldName(), 
ATypeTag.OBJECT);
+        if (embed) {
+            ((ArrayBackedValueStorage) 
parent.getValue()).set(valueEmbedder.getEmbeddedValue());
+            addValueToList();
+        } else {
+            valueEmbedder.enterObject();
+        }
+        return embed;
+    }
+
+    private void finalizeEmbedding() throws IOException {
+        IExternalFilterValueEmbedder valueEmbedder = 
context.getValueEmbedder();
+        if (valueEmbedder.isMissingEmbeddedValues()) {
+            String[] embeddedFieldNames = 
valueEmbedder.getEmbeddedFieldNames();
+            for (int i = 0; i < embeddedFieldNames.length; i++) {
+                String embeddedFieldName = embeddedFieldNames[i];
+                if (valueEmbedder.isMissing(embeddedFieldName)) {
+                    IValueReference embeddedValue = 
valueEmbedder.getEmbeddedValue();
+                    
recordBuilder.addField(context.getSerializedFieldName(embeddedFieldName), 
embeddedValue);
+                }
+            }
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
index 3936ad7..87d0ae4 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/nested/RepeatedConverter.java
@@ -21,6 +21,7 @@
 import java.io.DataOutput;
 import java.io.IOException;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve.PrimitiveConverterProvider;
@@ -29,7 +30,7 @@
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.PrimitiveType;

-class RepeatedConverter extends AbstractComplexConverter {
+public class RepeatedConverter extends AbstractComplexConverter {
     public RepeatedConverter(AbstractComplexConverter parent, int index, 
GroupType parquetType,
             ParquetConverterContext context) throws IOException {
         super(parent, index, parquetType, context);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
index 4f371c8..c5e7120 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/BinaryConverter.java
@@ -20,8 +20,8 @@

 import java.io.IOException;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.parquet.io.api.Binary;

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
index 07a5f79..4fd267c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DateConverter.java
@@ -20,8 +20,8 @@

 import java.io.IOException;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;

 class DateConverter extends GenericPrimitiveConverter {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
index 81fb36d..343876c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/DecimalConverter.java
@@ -23,8 +23,8 @@
 import java.math.BigInteger;
 import java.nio.ByteBuffer;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.parquet.io.api.Binary;

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
index 20f82f9..e708313 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/GenericPrimitiveConverter.java
@@ -20,16 +20,16 @@

 import java.io.IOException;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.IFieldValue;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.PrimitiveConverter;

-public class GenericPrimitiveConverter extends PrimitiveConverter implements 
IFieldValue {
-    private final ATypeTag typeTag;
+public class GenericPrimitiveConverter extends PrimitiveConverter implements 
IFieldValue, IClosableRepeatedConverter {
+    protected ATypeTag typeTag;
     protected final AbstractComplexConverter parent;
     protected final String stringFieldName;
     protected final IValueReference fieldName;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IClosableRepeatedConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IClosableRepeatedConverter.java
new file mode 100644
index 0000000..b23bbf9
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IClosableRepeatedConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+/**
+ * Special case interface to handle scenarios that are not following the 
latest parquet standards. See implementors
+ * of this interface methods for more details.
+ */
+public interface IClosableRepeatedConverter {
+
+    /**
+     * Calls any necessary end operations for repeated types
+     */
+    default void internalEnd() {
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
index 8e4c556..7b106e8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/JsonStringConverter.java
@@ -21,8 +21,8 @@
 import java.io.DataOutput;
 import java.io.IOException;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.external.parser.JSONDataParser;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.types.ATypeTag;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
index 0f36d11..0e16c21 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
@@ -21,8 +21,8 @@
 import java.io.IOException;

 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.AsterixTypeToParquetTypeVisitor;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveRepeatedConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveRepeatedConverter.java
new file mode 100644
index 0000000..a832484
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveRepeatedConverter.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.builders.IAsterixListBuilder;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * Handles the following non-standard parquet schema scenario:
+ * a primitive repeated list that is not contained in a LIST structure, for 
example
+ *
+ * {
+ *     "my_primitive_array": ["item1", "item2"]
+ * }
+ *
+ * Represented as:
+ * message schema {
+ *  repeated binary name=my_primitive_array (STRING);
+ * }
+ *
+ * Instead of:
+ * message schema {
+ *   required group my_primitive_array (LIST) {
+ *     repeated group list {
+ *       optional binary  (STRING);
+ *     }
+ *   }
+ * }
+ *
+ *  In this case, this is a list and the type of the repeated is the type of 
the elements in the list
+ */
+public class PrimitiveRepeatedConverter extends GenericPrimitiveConverter {
+    private IAsterixListBuilder builder;
+
+    protected IMutableValueStorage tempStorage;
+
+    public PrimitiveRepeatedConverter(ATypeTag typeTag, 
AbstractComplexConverter parent, PrimitiveType type, int index,
+            ParquetConverterContext context) throws IOException {
+        super(typeTag, parent, type.getName(), index, context);
+    }
+
+    private void internalStart() {
+        tempStorage = context.enterCollection();
+        builder = 
context.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+    }
+
+    @Override
+    public void internalEnd() {
+        try {
+            builder.write(parent.getDataOutput(), true);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+        parent.addValue(this);
+        context.exitCollection(tempStorage, builder);
+        tempStorage = null;
+        builder = null;
+    }
+
+    public DataOutput getDataOutput() {
+        if (tempStorage == null) {
+            internalStart();
+        }
+        tempStorage.reset();
+        return tempStorage.getDataOutput();
+    }
+
+    public void addValue() {
+        try {
+            builder.addItem(tempStorage);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+        context.serializeString(value, getDataOutput());
+        addValue();
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+        context.serializeBoolean(value, getDataOutput());
+        addValue();
+    }
+
+    @Override
+    public void addFloat(float value) {
+        addDouble(value);
+    }
+
+    @Override
+    public void addDouble(double value) {
+        context.serializeDouble(value, getDataOutput());
+        addValue();
+    }
+
+    @Override
+    public void addInt(int value) {
+        addLong(value);
+    }
+
+    @Override
+    public void addLong(long value) {
+        context.serializeInt64(value, getDataOutput());
+        addValue();
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
index c78d6e7..e854a64 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimeConverter.java
@@ -21,8 +21,8 @@
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.parquet.schema.LogicalTypeAnnotation;

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
index c36de37..4d0a61d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/TimestampConverter.java
@@ -22,8 +22,8 @@
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
index 23667ca..4384359 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UUIDConverter.java
@@ -20,8 +20,8 @@

 import java.io.IOException;

+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.parquet.io.api.Binary;

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
index 5bb4ad7..64b0ad7 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/UnsignedIntegerConverter.java
@@ -21,8 +21,8 @@
 import java.io.IOException;

 import org.apache.asterix.common.exceptions.ErrorCode;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
 import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
-import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.nested.AbstractComplexConverter;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.exceptions.Warning;


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18391
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I829df08c9ad667562194fbee027f63d8b2b31d4b
Gerrit-Change-Number: 18391
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-MessageType: newchange

Reply via email to