>From Ritik Raj <[email protected]>:
Ritik Raj has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20834?usp=email )
Change subject: [ASTERIXDB-3694][STO] Parquet reader INTERVAL support
......................................................................
[ASTERIXDB-3694][STO] Parquet reader INTERVAL support
- user model changes: no
- storage format changes: yes
- interface changes: yes
Details:
Contains changes for writing duration(Parquet's Interval)
and reading back the record via external dataset.
Ext-ref: MB-70267
Change-Id: I8dcd0da5db34991280bc4655059ca418fd8a71c9
---
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm
M
asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IntervalConverter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
12 files changed, 148 insertions(+), 17 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/34/20834/1
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp
index 56e79c8..c94cb73 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp
@@ -27,4 +27,8 @@
name : string
};
-CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id;
\ No newline at end of file
+CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id WITH {
+ "storage-format": {
+ "format": "column"
+ }
+};
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp
index 4cd2ec7..5ebaa16 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp
@@ -29,5 +29,5 @@
insert into TestCollection({"id":`day-time-duration`("-P3829H849.392S"),
"name": "Alex"});
*/
-insert into TestCollection({"id":18, "name": "Virat" ,
"dateType":date("1988-11-05"), "timeType": time("03:10:00.493Z") , "boolType" :
false , "doubleType" : 0.75, "datetimeType" : datetime("1900-02-01T00:00:00") ,
"uuidType" : uuid("95ca22dd-ef64-46f2-9c2a-a38005e23344") });
+insert into TestCollection({"id":18, "name": "Virat" ,
"dateType":date("1988-11-05"), "timeType": time("03:10:00.493Z") , "boolType" :
false , "doubleType" : 0.75, "datetimeType" : datetime("1900-02-01T00:00:00"),
"durationType": duration("P5Y12M1DT24H24M1.012S"), "uuidType" :
uuid("95ca22dd-ef64-46f2-9c2a-a38005e23344") });
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
index 042fa99..e8eb644 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp
@@ -24,7 +24,7 @@
) toWriter
TO %adapter%
PATH (%pathprefix% "copy-to-result", "parquet-cover-data-types")
-TYPE ( { name : string, id : int, dateType : date, timeType : time,
boolType : boolean, doubleType : double, datetimeType : datetime , uuidType
: uuid } )
+TYPE ( { name : string, id : int, dateType : date, timeType : time,
boolType : boolean, doubleType : double, datetimeType : datetime,
durationType: duration, uuidType : uuid } )
WITH {
%template_colons%,
%additionalProperties%
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm
index dd54be4..758326f 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm
@@ -1 +1 @@
-{ "name": "Virat", "id": 18, "dateType": date("1988-11-05"), "timeType":
time("03:10:00.493"), "boolType": false, "doubleType": 0.75, "datetimeType":
datetime("1900-02-01T00:00:00.000"), "uuidType":
uuid("95ca22dd-ef64-46f2-9c2a-a38005e23344") }
\ No newline at end of file
+{ "name": "Virat", "id": 18, "dateType": date("1988-11-05"), "timeType":
time("03:10:00.493"), "boolType": false, "doubleType": 0.75, "datetimeType":
datetime("1900-02-01T00:00:00.000"), "durationType":
duration("P6Y2DT24M1.12S"), "uuidType":
uuid("95ca22dd-ef64-46f2-9c2a-a38005e23344") }
\ No newline at end of file
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java
index 2776325..7bd5683 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java
@@ -81,14 +81,18 @@
byte[] a = value.getByteArray();
int ao = value.getStartOffset();
int monthsA = IntegerPointable.getInteger(a, ao);
- long millisA = LongPointable.getLong(a, ao + Integer.BYTES);
byte[] b = other.getByteArray();
int bo = other.getStartOffset();
int monthsB = IntegerPointable.getInteger(b, bo);
- long millisB = LongPointable.getLong(b, bo + Integer.BYTES);
int cmp = Integer.compare(monthsA, monthsB);
- return cmp != 0 ? cmp : Long.compare(millisA, millisB);
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ long millisA = LongPointable.getLong(a, ao + Integer.BYTES);
+ long millisB = LongPointable.getLong(b, bo + Integer.BYTES);
+ return Long.compare(millisA, millisB);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
index 254280d..60a4029 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java
@@ -44,6 +44,7 @@
import
org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.IntervalLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
@@ -261,6 +262,8 @@
ATypeTag inferredTypeTag = ATypeTag.SYSTEM_NULL;
if (logicalType == null || logicalType ==
LogicalTypeAnnotation.bsonType()) {
inferredTypeTag = ATypeTag.BINARY;
+ } else if (logicalType instanceof IntervalLogicalTypeAnnotation) {
+ inferredTypeTag = ATypeTag.DURATION;
} else if (logicalType == LogicalTypeAnnotation.stringType()
|| logicalType == LogicalTypeAnnotation.enumType()) {
inferredTypeTag = ATypeTag.STRING;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
index 7e9b3a20..5e9f464 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java
@@ -36,10 +36,12 @@
import org.apache.asterix.om.base.ADate;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.ADuration;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AMutableDate;
import org.apache.asterix.om.base.AMutableDateTime;
import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableDuration;
import org.apache.asterix.om.base.AMutableInt64;
import org.apache.asterix.om.base.AMutableTime;
import org.apache.asterix.om.base.ANull;
@@ -84,6 +86,9 @@
private final ISerializerDeserializer<ADateTime> datetimeSerDer =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
@SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADuration> durationSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADURATION);
+ @SuppressWarnings("unchecked")
private final ISerializerDeserializer<ANull> nullSerDer =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
@@ -109,6 +114,7 @@
private final AMutableDate mutableDate = new AMutableDate(0);
private final AMutableTime mutableTime = new AMutableTime(0);
private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+ private final AMutableDuration mutableDuration = new AMutableDuration(0,
0);
/*
* ************************************************************************
@@ -297,6 +303,15 @@
}
}
+ public void serializeDuration(int months, long milliseconds, DataOutput
output) {
+ try {
+ mutableDuration.setValue(months, milliseconds);
+ durationSerDer.serialize(mutableDuration, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
public void serializeNull(DataOutput output) {
try {
nullSerDer.serialize(ANull.NULL, output);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IntervalConverter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IntervalConverter.java
new file mode 100644
index 0000000..71d9ffa
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IntervalConverter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter;
+import
org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Converts Parquet's logical INTERVAL type (months:int32, days:int32,
millis:int32) into Asterix {@link ATypeTag#DURATION}
+ * (months:int32, millis:int64) by folding days into milliseconds (days *
86400000 + millis).
+ */
+public class IntervalConverter extends GenericPrimitiveConverter {
+ private static final long MILLIS_PER_DAY = 86_400_000L;
+
+ IntervalConverter(AbstractComplexConverter parent, String stringFieldName,
int index,
+ ParquetConverterContext context) throws IOException {
+ super(ATypeTag.DURATION, parent, stringFieldName, index, context);
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ ByteBuffer buffer =
value.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ int months = buffer.getInt();
+ int days = buffer.getInt();
+ int millis = buffer.getInt();
+
+ long dayTimeMillis = days * MILLIS_PER_DAY + millis;
+ context.serializeDuration(months, dayTimeMillis,
parent.getDataOutput());
+ parent.addValue(this);
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
index 0e16c21..cb55729 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java
@@ -72,6 +72,8 @@
return getTimeConverter(type, parent, stringFieldName, index,
context);
case DATETIME:
return getTimeStampConverter(type, parent, stringFieldName,
index, context);
+ case DURATION:
+ return new IntervalConverter(parent, stringFieldName, index,
context);
case ANY:
return new JsonStringConverter(parent, stringFieldName, index,
context);
default:
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
index 6bdc586..976ac54 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java
@@ -46,15 +46,16 @@
Map.entry(ATypeTag.DATE,
PrimitiveType.PrimitiveTypeName.INT32),
Map.entry(ATypeTag.TIME,
PrimitiveType.PrimitiveTypeName.INT32),
Map.entry(ATypeTag.DATETIME,
PrimitiveType.PrimitiveTypeName.INT64),
+ Map.entry(ATypeTag.DURATION,
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY),
Map.entry(ATypeTag.UUID,
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY));
- public static final Map<ATypeTag, LogicalTypeAnnotation>
LOGICAL_TYPE_ANNOTATION_MAP =
- Map.ofEntries(Map.entry(ATypeTag.STRING,
LogicalTypeAnnotation.stringType()),
- Map.entry(ATypeTag.UUID, LogicalTypeAnnotation.uuidType()),
- Map.entry(ATypeTag.DATE, LogicalTypeAnnotation.dateType()),
- Map.entry(ATypeTag.TIME,
- LogicalTypeAnnotation.timeType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS)),
- Map.entry(ATypeTag.DATETIME,
- LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS)));
+ public static final Map<ATypeTag, LogicalTypeAnnotation>
LOGICAL_TYPE_ANNOTATION_MAP = Map.ofEntries(
+ Map.entry(ATypeTag.STRING, LogicalTypeAnnotation.stringType()),
+ Map.entry(ATypeTag.UUID, LogicalTypeAnnotation.uuidType()),
+ Map.entry(ATypeTag.DATE, LogicalTypeAnnotation.dateType()),
+ Map.entry(ATypeTag.TIME, LogicalTypeAnnotation.timeType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS)),
+ Map.entry(ATypeTag.DATETIME,
+ LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS)),
+ Map.entry(ATypeTag.DURATION,
LogicalTypeAnnotation.intervalType()));
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
index da0cef0..914d6d7 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.writer.printer.parquet;
+import static
org.apache.asterix.external.writer.printer.parquet.ParquetValueWriter.INTERVAL_BINARY_LENGTH;
+
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
@@ -47,9 +49,19 @@
public static Types.Builder<?, ?> getPrimitiveChild(Types.Builder parent,
PrimitiveType.PrimitiveTypeName type,
LogicalTypeAnnotation annotation) {
if (parent instanceof Types.BaseGroupBuilder) {
- return ((Types.BaseGroupBuilder<?, ?>)
parent).optional(type).as(annotation);
+ Types.PrimitiveBuilder<?> primitiveBuilder =
+ ((Types.BaseGroupBuilder<?, ?>)
parent).optional(type).as(annotation);
+ if (annotation instanceof
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) {
+ primitiveBuilder.length(INTERVAL_BINARY_LENGTH);
+ }
+ return primitiveBuilder;
} else if (parent instanceof Types.BaseListBuilder) {
- return ((Types.BaseListBuilder<?, ?>)
parent).optionalElement(type).as(annotation);
+ Types.BaseListBuilder.ElementBuilder<?, ?> elementBuilder =
+ ((Types.BaseListBuilder<?, ?>)
parent).optionalElement(type).as(annotation);
+ if (annotation instanceof
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) {
+ elementBuilder.length(INTERVAL_BINARY_LENGTH);
+ }
+ return elementBuilder;
} else {
return null;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
index 38d12f9..2899c06 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java
@@ -28,6 +28,7 @@
import
org.apache.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import
org.apache.asterix.dataflow.data.nontagged.serde.ADurationSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
@@ -46,6 +47,7 @@
//This class reduces the number of Java objects created each time a column is
written to a Parquet file by reusing the same VoidPointable for all columns
within the file.
public class ParquetValueWriter {
+ public static final int INTERVAL_BINARY_LENGTH = 12;
public static final String LIST_FIELD = "list";
public static final String ELEMENT_FIELD = "element";
@@ -54,10 +56,12 @@
private final VoidPointable voidPointable;
private final ResettableByteArrayOutputStream byteArrayOutputStream;
+ private final byte[] intervalBytes;
ParquetValueWriter() {
this.voidPointable = VoidPointable.FACTORY.createPointable();
this.byteArrayOutputStream = new ResettableByteArrayOutputStream();
+ this.intervalBytes = new byte[INTERVAL_BINARY_LENGTH];
}
private void addIntegerType(long value, PrimitiveType.PrimitiveTypeName
primitiveTypeName, ATypeTag typeTag,
@@ -189,6 +193,39 @@
long dateTimeValue =
ADateTimeSerializerDeserializer.getChronon(b, s);
addIntegerType(dateTimeValue, primitiveTypeName, typeTag,
recordConsumer);
break;
+ case DURATION:
+ if (primitiveTypeName !=
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ || type.getTypeLength() != 12) {
+ throw
RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName,
typeTag);
+ }
+
+ int months = ADurationSerializerDeserializer.getYearMonth(b,
s);
+ long totalMillis =
ADurationSerializerDeserializer.getDayTime(b, s);
+ long daysLong = Math.floorDiv(totalMillis, 86_400_000L);
+
+ if (daysLong < Integer.MIN_VALUE || daysLong >
Integer.MAX_VALUE) {
+ throw
RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName,
typeTag);
+ }
+
+ int days = (int) daysLong;
+ long millisOfDayLong = totalMillis - (days * 86_400_000L);
+ int millisOfDay = (int) millisOfDayLong;
+
+ // Parquet INTERVAL is little-endian.
+ intervalBytes[0] = (byte) (months);
+ intervalBytes[1] = (byte) (months >>> 8);
+ intervalBytes[2] = (byte) (months >>> 16);
+ intervalBytes[3] = (byte) (months >>> 24);
+ intervalBytes[4] = (byte) (days);
+ intervalBytes[5] = (byte) (days >>> 8);
+ intervalBytes[6] = (byte) (days >>> 16);
+ intervalBytes[7] = (byte) (days >>> 24);
+ intervalBytes[8] = (byte) (millisOfDay);
+ intervalBytes[9] = (byte) (millisOfDay >>> 8);
+ intervalBytes[10] = (byte) (millisOfDay >>> 16);
+ intervalBytes[11] = (byte) (millisOfDay >>> 24);
+
+
recordConsumer.addBinary(Binary.fromReusedByteArray(intervalBytes, 0,
INTERVAL_BINARY_LENGTH));
case UUID:
recordConsumer.addBinary(Binary.fromReusedByteArray(b, s, l));
break;
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20834?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I8dcd0da5db34991280bc4655059ca418fd8a71c9
Gerrit-Change-Number: 20834
Gerrit-PatchSet: 1
Gerrit-Owner: Ritik Raj <[email protected]>