>From Murtadha Hubail <[email protected]>: Murtadha Hubail has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068 )
Change subject: [ASTERIXDB-3503][EXT] Add optional parsing parameters for delta ...................................................................... [ASTERIXDB-3503][EXT] Add optional parsing parameters for delta - user model changes: yes - storage format changes: no - interface changes: no Details: - Add the following optional parsing parameters for delta: -- timestamp-to-long: parse date as int; otherwise as ADateTime -- date-to-int: parse date as int; otherwise as ADate -- decimal-to-double -- timezone Ext-ref: MB-63840 Change-Id: I6fa3d46588116508716fc1abd693f75ee5538d7f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java 5 files changed, 207 insertions(+), 33 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Jenkins: Verified; Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java new file mode 100644 index 0000000..81e465c --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta.converter; + +import java.io.DataOutput; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.asterix.external.parser.jackson.ParserContext; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.om.base.ADate; +import org.apache.asterix.om.base.ADateTime; +import org.apache.asterix.om.base.AMutableDate; +import org.apache.asterix.om.base.AMutableDateTime; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class DeltaConverterContext extends ParserContext { + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<ADate> dateSerDer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE); + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<ADateTime> datetimeSerDer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME); + private final boolean decimalToDouble; + private final boolean timestampAsLong; + private final boolean dateAsInt; + + private final int timeZoneOffset; + private final AMutableDate mutableDate = new AMutableDate(0); + private final AMutableDateTime mutableDateTime = new AMutableDateTime(0); + + public DeltaConverterContext(Map<String, String> configuration) { + decimalToDouble = Boolean.parseBoolean(configuration + .getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE)); + timestampAsLong = Boolean.parseBoolean(configuration + .getOrDefault(ExternalDataConstants.DeltaOptions.TIMESTAMP_AS_LONG, ExternalDataConstants.TRUE)); + dateAsInt = Boolean.parseBoolean( + configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DATE_AS_INT, ExternalDataConstants.TRUE)); + String configuredTimeZoneId = configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE); + if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) { + timeZoneOffset = TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset(); + } else { + timeZoneOffset = 0; + } + } + + public void serializeDate(int value, DataOutput output) { + try { + mutableDate.setValue(value); + dateSerDer.serialize(mutableDate, output); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } + + public void serializeDateTime(long timestamp, DataOutput output) { + try { + mutableDateTime.setValue(timestamp); + datetimeSerDer.serialize(mutableDateTime, output); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } + + public boolean isDecimalToDoubleEnabled() { + return decimalToDouble; + } + + public int getTimeZoneOffset() { + return timeZoneOffset; + } + + public boolean isTimestampAsLong() { + return timestampAsLong; + } + + public boolean isDateAsInt() { + return dateAsInt; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java index e56be86..ea02d77 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java @@ -18,12 +18,12 @@ */ package org.apache.asterix.external.parser; -import static org.apache.avro.Schema.Type.NULL; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.DataOutput; import java.io.IOException; import java.math.BigDecimal; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.asterix.builders.IARecordBuilder; @@ -34,7 +34,8 @@ import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; -import org.apache.asterix.external.parser.jackson.ParserContext; +import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.om.base.ABoolean; import org.apache.asterix.om.base.ANull; import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; @@ -63,11 +64,11 @@ import io.delta.kernel.types.TimestampType; public class DeltaDataParser extends AbstractDataParser implements IRecordDataParser<Row> { - private final ParserContext parserContext; + private final DeltaConverterContext parserContext; private final IExternalFilterValueEmbedder valueEmbedder; - public DeltaDataParser(IExternalDataRuntimeContext context) { - parserContext = new ParserContext(); + public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) { + parserContext = new DeltaConverterContext(conf); valueEmbedder = context.getValueEmbedder(); } @@ -160,7 +161,6 @@ if (isNull) { return ATypeTag.NULL; } - if (schema instanceof BooleanType) { return ATypeTag.BOOLEAN; } else if (schema instanceof ShortType || schema instanceof IntegerType || schema instanceof LongType) { @@ -170,15 +170,24 @@ } else if (schema instanceof StringType) { return ATypeTag.STRING; } else if (schema instanceof DateType) { - return ATypeTag.BIGINT; + if (parserContext.isDateAsInt()) { + return ATypeTag.INTEGER; + } + return ATypeTag.DATE; } else if (schema instanceof TimestampType || schema instanceof TimestampNTZType) { - return ATypeTag.BIGINT; + if (parserContext.isTimestampAsLong()) { + return ATypeTag.BIGINT; + } + return ATypeTag.DATETIME; } else if (schema instanceof BinaryType) { return ATypeTag.BINARY; } else if (schema instanceof ArrayType) { return ATypeTag.ARRAY; } else if (schema instanceof StructType) { return ATypeTag.OBJECT; + } else if (schema instanceof DecimalType) { + ensureDecimalToDoubleEnabled(schema, parserContext); + return ATypeTag.DOUBLE; } else { throw createUnsupportedException(schema); } @@ -204,11 +213,26 @@ } else if (schema instanceof StringType) { serializeString(row.getString(index), out); } else if (schema instanceof DateType) { - serializeDate(row.getInt(index), out); + if (parserContext.isDateAsInt()) { + serializeLong(row.getInt(index), out); + } else { + parserContext.serializeDate(row.getInt(index), out); + } } else if (schema instanceof TimestampType) { - serializeTimestamp(row.getLong(index), out); + long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(row.getLong(index)); + int offset = parserContext.getTimeZoneOffset(); + if (parserContext.isTimestampAsLong()) { + serializeLong(timeStampInMillis + offset, out); + } else { + parserContext.serializeDateTime(timeStampInMillis + offset, out); + } } else if (schema instanceof TimestampNTZType) { - serializeTimestamp(row.getLong(index), out); + long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(row.getLong(index)); + if (parserContext.isTimestampAsLong()) { + serializeLong(timeStampInMillis, out); + } else { + parserContext.serializeDateTime(timeStampInMillis, out); + } } else if (schema instanceof StructType) { parseObject(row.getStruct(index), out); } else if (schema instanceof ArrayType) { @@ -240,11 +264,26 @@ } else if (schema instanceof StringType) { serializeString(column.getString(index), out); } else if (schema instanceof DateType) { - serializeDate(column.getInt(index), out); + if (parserContext.isDateAsInt()) { + serializeLong(column.getInt(index), out); + } else { + parserContext.serializeDate(column.getInt(index), out); + } } else if (schema instanceof TimestampType) { - serializeTimestamp(column.getLong(index), out); + long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(column.getLong(index)); + int offset = parserContext.getTimeZoneOffset(); + if (parserContext.isTimestampAsLong()) { + serializeLong(timeStampInMillis + offset, out); + } else { + parserContext.serializeDateTime(timeStampInMillis + offset, out); + } } else if (schema instanceof TimestampNTZType) { - serializeTimestamp(column.getLong(index), out); + long timeStampInMillis = TimeUnit.MICROSECONDS.toMillis(column.getLong(index)); + if (parserContext.isTimestampAsLong()) { + serializeLong(timeStampInMillis, out); + } else { + parserContext.serializeDateTime(timeStampInMillis, out); + } } else if (schema instanceof ArrayType) { parseArray((ArrayType) schema, column.getArray(index), out); } else if (schema instanceof StructType) { @@ -273,16 +312,6 @@ stringSerde.serialize(aString, out); } - private void serializeDate(Object value, DataOutput out) throws HyracksDataException { - aInt32.setValue((Integer) value); - int32Serde.serialize(aInt32, out); - } - - private void serializeTimestamp(Object value, DataOutput out) throws HyracksDataException { - aInt64.setValue(TimeUnit.MICROSECONDS.toMillis((Long) value)); - int64Serde.serialize(aInt64, out); - } - private void serializeDecimal(BigDecimal value, DataOutput out) throws HyracksDataException { serializeDouble(value.doubleValue(), out); } @@ -290,4 +319,12 @@ private static HyracksDataException createUnsupportedException(DataType schema) { return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Delta Parser", schema.toString()); } + + private static void ensureDecimalToDoubleEnabled(DataType type, DeltaConverterContext context) + throws RuntimeDataException { + if (!context.isDecimalToDoubleEnabled()) { + throw new RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, type.toString(), + ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE); + } + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java index 5d4b2dd..7f28105 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java @@ -60,7 +60,6 @@ } private DeltaDataParser createParser(IExternalDataRuntimeContext context) { - return new DeltaDataParser(context); + return new DeltaDataParser(context, configuration); } - } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index d8f89c2..ffe75cb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -19,7 +19,6 @@ package org.apache.asterix.external.util; import java.util.Set; -import java.util.TimeZone; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -328,6 +327,16 @@ WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP); } + public static class DeltaOptions { + private DeltaOptions() { + } + + public static final String DECIMAL_TO_DOUBLE = "decimal-to-double"; + public static final String TIMESTAMP_AS_LONG = "timestamp-to-long"; + public static final String DATE_AS_INT = "date-to-int"; + public static final String TIMEZONE = "timezone"; + } + public static class ParquetOptions { private ParquetOptions() { } @@ -357,10 +366,5 @@ */ public static final String TIMEZONE = "timezone"; public static final String HADOOP_TIMEZONE = ASTERIX_HADOOP_PREFIX + TIMEZONE; - - /** - * Valid time zones that are supported by Java - */ - public static final Set<String> VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs()); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index bf74079..bc5b8c3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -48,6 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; import java.util.function.BiPredicate; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -109,6 +111,8 @@ import org.apache.iceberg.io.CloseableIterable; public class ExternalDataUtils { + + private static final Set<String> validTimeZones = Set.of(TimeZone.getAvailableIDs()); private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class); private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024; private static final int HEADER_FUDGE = 64; @@ -493,6 +497,11 @@ throw new CompilationException(ErrorCode.INVALID_DELTA_TABLE_FORMAT, configuration.get(ExternalDataConstants.KEY_FORMAT)); } + if (configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE) + && !validTimeZones.contains(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE))) { + throw new CompilationException(ErrorCode.INVALID_TIMEZONE, + configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE)); + } } public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf, @@ -928,7 +937,7 @@ if (datasetRecordType.getFieldTypes().length != 0) { throw new CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, datasetRecordType.getTypeName()); } else if (properties.containsKey(ParquetOptions.TIMEZONE) - && !ParquetOptions.VALID_TIME_ZONES.contains(properties.get(ParquetOptions.TIMEZONE))) { + && !validTimeZones.contains(properties.get(ParquetOptions.TIMEZONE))) { //Ensure the configured time zone id is correct throw new CompilationException(ErrorCode.INVALID_TIMEZONE, properties.get(ParquetOptions.TIMEZONE)); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: goldfish Gerrit-Change-Id: I6fa3d46588116508716fc1abd693f75ee5538d7f Gerrit-Change-Number: 19068 Gerrit-PatchSet: 6 Gerrit-Owner: Ayush Tripathi <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
