>From Ritik Raj <[email protected]>: Ritik Raj has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20834?usp=email )
Change subject: [ASTERIXDB-3694][STO] Parquet reader INTERVAL support ...................................................................... [ASTERIXDB-3694][STO] Parquet reader INTERVAL support - user model changes: no - storage format changes: yes - interface changes: yes Details: Contains changes for writing duration(Parquet's Interval) and reading back the record via external dataset. Ext-ref: MB-70267 Change-Id: I8dcd0da5db34991280bc4655059ca418fd8a71c9 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20834 Reviewed-by: Ritik Raj <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> --- M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm M asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IntervalConverter.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FixedByteArrayOutputStream.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java 13 files changed, 200 insertions(+), 17 deletions(-) Approvals: Jenkins: Verified; Verified Ritik Raj: Looks good to me, but someone else must approve Wail Alkowaileet: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp index 56e79c8..c94cb73 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.01.ddl.sqlpp @@ -27,4 +27,8 @@ name : string }; -CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id; \ No newline at end of file +CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id WITH { + "storage-format": { + "format": "column" + } +}; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp index 4cd2ec7..5ebaa16 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.02.update.sqlpp @@ -29,5 +29,5 @@ insert into TestCollection({"id":`day-time-duration`("-P3829H849.392S"), "name": "Alex"}); */ -insert into TestCollection({"id":18, "name": "Virat" , "dateType":date("1988-11-05"), "timeType": time("03:10:00.493Z") , "boolType" : false , "doubleType" : 0.75, "datetimeType" : datetime("1900-02-01T00:00:00") , "uuidType" : uuid("95ca22dd-ef64-46f2-9c2a-a38005e23344") }); +insert into TestCollection({"id":18, "name": "Virat" , "dateType":date("1988-11-05"), "timeType": time("03:10:00.493Z") , "boolType" : false , "doubleType" : 0.75, "datetimeType" : datetime("1900-02-01T00:00:00"), "durationType": duration("P5Y12M1DT24H24M1.012S"), "uuidType" : uuid("95ca22dd-ef64-46f2-9c2a-a38005e23344") }); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp index 042fa99..e8eb644 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-cover-data-types/parquet-cover-data-types.03.update.sqlpp @@ -24,7 +24,7 @@ ) toWriter TO %adapter% PATH (%pathprefix% "copy-to-result", "parquet-cover-data-types") -TYPE ( { name : string, id : int, dateType : date, timeType : time, boolType : boolean, doubleType : double, datetimeType : datetime , uuidType : uuid } ) +TYPE ( { name : string, id : int, dateType : date, timeType : time, boolType : boolean, doubleType : double, datetimeType : datetime, durationType: duration, uuidType : uuid } ) WITH { %template_colons%, %additionalProperties% diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm index dd54be4..758326f 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-cover-data-types/parquet-cover-data-types.05.adm @@ -1 +1 @@ -{ "name": "Virat", "id": 18, "dateType": date("1988-11-05"), "timeType": time("03:10:00.493"), "boolType": false, "doubleType": 0.75, "datetimeType": datetime("1900-02-01T00:00:00.000"), "uuidType": uuid("95ca22dd-ef64-46f2-9c2a-a38005e23344") } \ No newline at end of file +{ "name": "Virat", "id": 18, "dateType": date("1988-11-05"), "timeType": time("03:10:00.493"), "boolType": false, "doubleType": 0.75, "datetimeType": datetime("1900-02-01T00:00:00.000"), "durationType": duration("P6Y2DT24M1.12S"), "uuidType": uuid("95ca22dd-ef64-46f2-9c2a-a38005e23344") } \ No newline at end of file diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java index 2776325..7bd5683 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/temporal/DurationValueReader.java @@ -81,14 +81,18 @@ byte[] a = value.getByteArray(); int ao = value.getStartOffset(); int monthsA = IntegerPointable.getInteger(a, ao); - long millisA = LongPointable.getLong(a, ao + Integer.BYTES); byte[] b = other.getByteArray(); int bo = other.getStartOffset(); int monthsB = IntegerPointable.getInteger(b, bo); - long millisB = LongPointable.getLong(b, bo + Integer.BYTES); int cmp = Integer.compare(monthsA, monthsB); - return cmp != 0 ? cmp : Long.compare(millisA, millisB); + if (cmp != 0) { + return cmp; + } + + long millisA = LongPointable.getLong(a, ao + Integer.BYTES); + long millisB = LongPointable.getLong(b, bo + Integer.BYTES); + return Long.compare(millisA, millisB); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java index 254280d..60a4029 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/AsterixTypeToParquetTypeVisitor.java @@ -44,6 +44,7 @@ import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntervalLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; @@ -261,6 +262,8 @@ ATypeTag inferredTypeTag = ATypeTag.SYSTEM_NULL; if (logicalType == null || logicalType == LogicalTypeAnnotation.bsonType()) { inferredTypeTag = ATypeTag.BINARY; + } else if (logicalType instanceof IntervalLogicalTypeAnnotation) { + inferredTypeTag = ATypeTag.DURATION; } else if (logicalType == LogicalTypeAnnotation.stringType() || logicalType == LogicalTypeAnnotation.enumType()) { inferredTypeTag = ATypeTag.STRING; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java index 7e9b3a20..5e9f464 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/ParquetConverterContext.java @@ -36,10 +36,12 @@ import org.apache.asterix.om.base.ADate; import org.apache.asterix.om.base.ADateTime; import org.apache.asterix.om.base.ADouble; +import org.apache.asterix.om.base.ADuration; import org.apache.asterix.om.base.AInt64; import org.apache.asterix.om.base.AMutableDate; import org.apache.asterix.om.base.AMutableDateTime; import org.apache.asterix.om.base.AMutableDouble; +import org.apache.asterix.om.base.AMutableDuration; import org.apache.asterix.om.base.AMutableInt64; import org.apache.asterix.om.base.AMutableTime; import org.apache.asterix.om.base.ANull; @@ -84,6 +86,9 @@ private final ISerializerDeserializer<ADateTime> datetimeSerDer = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME); @SuppressWarnings("unchecked") + private final ISerializerDeserializer<ADuration> durationSerDer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADURATION); + @SuppressWarnings("unchecked") private final ISerializerDeserializer<ANull> nullSerDer = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); @@ -109,6 +114,7 @@ private final AMutableDate mutableDate = new AMutableDate(0); private final AMutableTime mutableTime = new AMutableTime(0); private final AMutableDateTime mutableDateTime = new AMutableDateTime(0); + private final AMutableDuration mutableDuration = new AMutableDuration(0, 0); /* * ************************************************************************ @@ -297,6 +303,15 @@ } } + public void serializeDuration(int months, long milliseconds, DataOutput output) { + try { + mutableDuration.setValue(months, milliseconds); + durationSerDer.serialize(mutableDuration, output); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } + public void serializeNull(DataOutput output) { try { nullSerDer.serialize(ANull.NULL, output); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IntervalConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IntervalConverter.java new file mode 100644 index 0000000..9ef8374 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/IntervalConverter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.primitve; + +import java.io.IOException; + +import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.AbstractComplexConverter; +import org.apache.asterix.external.input.record.reader.hdfs.parquet.converter.ParquetConverterContext; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.parquet.io.api.Binary; + +/** + * Converts Parquet's logical INTERVAL type (months:int32, days:int32, millis:int32) into Asterix {@link ATypeTag#DURATION} + * (months:int32, millis:int64) by folding days into milliseconds (days * 86400000 + millis). + */ +public class IntervalConverter extends GenericPrimitiveConverter { + private static final long MILLIS_PER_DAY = 86_400_000L; + + IntervalConverter(AbstractComplexConverter parent, String stringFieldName, int index, + ParquetConverterContext context) throws IOException { + super(ATypeTag.DURATION, parent, stringFieldName, index, context); + } + + @Override + public void addBinary(Binary value) { + byte[] bytes = value.getBytesUnsafe(); + int months = readIntLE(bytes, 0); + int days = readIntLE(bytes, 4); + int millis = readIntLE(bytes, 8); + + long dayTimeMillis = days * MILLIS_PER_DAY + millis; + context.serializeDuration(months, dayTimeMillis, parent.getDataOutput()); + parent.addValue(this); + } + + private static int readIntLE(byte[] b, int o) { + return (b[o] & 0xFF) | ((b[o + 1] & 0xFF) << 8) | ((b[o + 2] & 0xFF) << 16) | ((b[o + 3] & 0xFF) << 24); + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java index 0e16c21..cb55729 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/hdfs/parquet/converter/primitve/PrimitiveConverterProvider.java @@ -72,6 +72,8 @@ return getTimeConverter(type, parent, stringFieldName, index, context); case DATETIME: return getTimeStampConverter(type, parent, stringFieldName, index, context); + case DURATION: + return new IntervalConverter(parent, stringFieldName, index, context); case ANY: return new JsonStringConverter(parent, stringFieldName, index, context); default: diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java index 6bdc586..976ac54 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/AsterixParquetTypeMap.java @@ -46,15 +46,16 @@ Map.entry(ATypeTag.DATE, PrimitiveType.PrimitiveTypeName.INT32), Map.entry(ATypeTag.TIME, PrimitiveType.PrimitiveTypeName.INT32), Map.entry(ATypeTag.DATETIME, PrimitiveType.PrimitiveTypeName.INT64), + Map.entry(ATypeTag.DURATION, PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY), Map.entry(ATypeTag.UUID, PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)); - public static final Map<ATypeTag, LogicalTypeAnnotation> LOGICAL_TYPE_ANNOTATION_MAP = - Map.ofEntries(Map.entry(ATypeTag.STRING, LogicalTypeAnnotation.stringType()), - Map.entry(ATypeTag.UUID, LogicalTypeAnnotation.uuidType()), - Map.entry(ATypeTag.DATE, LogicalTypeAnnotation.dateType()), - Map.entry(ATypeTag.TIME, - LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)), - Map.entry(ATypeTag.DATETIME, - LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS))); + public static final Map<ATypeTag, LogicalTypeAnnotation> LOGICAL_TYPE_ANNOTATION_MAP = Map.ofEntries( + Map.entry(ATypeTag.STRING, LogicalTypeAnnotation.stringType()), + Map.entry(ATypeTag.UUID, LogicalTypeAnnotation.uuidType()), + Map.entry(ATypeTag.DATE, LogicalTypeAnnotation.dateType()), + Map.entry(ATypeTag.TIME, LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)), + Map.entry(ATypeTag.DATETIME, + LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)), + Map.entry(ATypeTag.DURATION, LogicalTypeAnnotation.intervalType())); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FixedByteArrayOutputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FixedByteArrayOutputStream.java new file mode 100644 index 0000000..e42d817 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FixedByteArrayOutputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.writer.printer.parquet; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Fixed-size output stream backed by a byte[] allocated once. + * reset() only resets the write position to 0 (no parameters, no resizing). + */ +final class FixedByteArrayOutputStream extends OutputStream { + private final byte[] bytes; + private int pos; + + FixedByteArrayOutputStream(int size) { + this.bytes = new byte[size]; + this.pos = 0; + } + + void reset() { + pos = 0; + } + + byte[] getByteArray() { + return bytes; + } + + @Override + public void write(int b) throws IOException { + if (pos >= bytes.length) { + throw new IOException("FixedByteArrayOutputStream overflow. size=" + bytes.length); + } + bytes[pos++] = (byte) b; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java index da0cef0..914d6d7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaBuilderUtils.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.external.writer.printer.parquet; +import static org.apache.asterix.external.writer.printer.parquet.ParquetValueWriter.INTERVAL_BINARY_LENGTH; + import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -47,9 +49,19 @@ public static Types.Builder<?, ?> getPrimitiveChild(Types.Builder parent, PrimitiveType.PrimitiveTypeName type, LogicalTypeAnnotation annotation) { if (parent instanceof Types.BaseGroupBuilder) { - return ((Types.BaseGroupBuilder<?, ?>) parent).optional(type).as(annotation); + Types.PrimitiveBuilder<?> primitiveBuilder = + ((Types.BaseGroupBuilder<?, ?>) parent).optional(type).as(annotation); + if (annotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) { + primitiveBuilder.length(INTERVAL_BINARY_LENGTH); + } + return primitiveBuilder; } else if (parent instanceof Types.BaseListBuilder) { - return ((Types.BaseListBuilder<?, ?>) parent).optionalElement(type).as(annotation); + Types.BaseListBuilder.ElementBuilder<?, ?> elementBuilder = + ((Types.BaseListBuilder<?, ?>) parent).optionalElement(type).as(annotation); + if (annotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) { + elementBuilder.length(INTERVAL_BINARY_LENGTH); + } + return elementBuilder; } else { return null; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java index 38d12f9..77a839a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java @@ -28,6 +28,7 @@ import org.apache.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.ADurationSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer; @@ -40,24 +41,29 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.util.string.UTF8StringUtil; +import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.PrimitiveType; //This class reduces the number of Java objects created each time a column is written to a Parquet file by reusing the same VoidPointable for all columns within the file. public class ParquetValueWriter { + public static final int INTERVAL_BINARY_LENGTH = 12; public static final String LIST_FIELD = "list"; public static final String ELEMENT_FIELD = "element"; public static final String GROUP_TYPE_ERROR_FIELD = "group"; public static final String PRIMITIVE_TYPE_ERROR_FIELD = "primitive"; + public static final long MILLISECONDS_IN_A_DAY = 86_400_000L; private final VoidPointable voidPointable; private final ResettableByteArrayOutputStream byteArrayOutputStream; + private final FixedByteArrayOutputStream fixedLen12ByteStream; ParquetValueWriter() { this.voidPointable = VoidPointable.FACTORY.createPointable(); this.byteArrayOutputStream = new ResettableByteArrayOutputStream(); + fixedLen12ByteStream = new FixedByteArrayOutputStream(INTERVAL_BINARY_LENGTH); } private void addIntegerType(long value, PrimitiveType.PrimitiveTypeName primitiveTypeName, ATypeTag typeTag, @@ -189,6 +195,35 @@ long dateTimeValue = ADateTimeSerializerDeserializer.getChronon(b, s); addIntegerType(dateTimeValue, primitiveTypeName, typeTag, recordConsumer); break; + case DURATION: + if (primitiveTypeName != PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + || type.getTypeLength() != 12) { + throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag); + } + + int months = ADurationSerializerDeserializer.getYearMonth(b, s); + long totalMillis = ADurationSerializerDeserializer.getDayTime(b, s); + long daysLong = Math.floorDiv(totalMillis, MILLISECONDS_IN_A_DAY); + + if (daysLong < Integer.MIN_VALUE || daysLong > Integer.MAX_VALUE) { + throw RuntimeDataException.create(ErrorCode.TYPE_MISMATCH_GENERIC, primitiveTypeName, typeTag); + } + + int days = (int) daysLong; + long millisOfDayLong = totalMillis - (days * MILLISECONDS_IN_A_DAY); + int millisOfDay = (int) millisOfDayLong; + + fixedLen12ByteStream.reset(); + try { + BytesUtils.writeIntLittleEndian(fixedLen12ByteStream, months); + BytesUtils.writeIntLittleEndian(fixedLen12ByteStream, days); + BytesUtils.writeIntLittleEndian(fixedLen12ByteStream, millisOfDay); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + + recordConsumer.addBinary( + Binary.fromReusedByteArray(fixedLen12ByteStream.getByteArray(), 0, INTERVAL_BINARY_LENGTH)); case UUID: recordConsumer.addBinary(Binary.fromReusedByteArray(b, s, l)); break; -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20834?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I8dcd0da5db34991280bc4655059ca418fd8a71c9 Gerrit-Change-Number: 20834 Gerrit-PatchSet: 4 Gerrit-Owner: Ritik Raj <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Ritik Raj <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]>
