>From Ayush Tripathi <[email protected]>: Ayush Tripathi has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068 )
Change subject: Making decimal-to-double,timestamp processing conversions ...................................................................... Making decimal-to-double,timestamp processing conversions Change-Id: I6fa3d46588116508716fc1abd693f75ee5538d7f --- A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java 4 files changed, 200 insertions(+), 27 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/68/19068/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java new file mode 100644 index 0000000..f105297 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta.converter; + +import java.io.DataOutput; +import java.util.Map; +import java.util.TimeZone; + +import org.apache.asterix.external.parser.jackson.ParserContext; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.om.base.*; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class DeltaConverterContext extends ParserContext { + private final ISerializerDeserializer<ADate> dateSerDer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE); + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<ATime> timeSerDer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ATIME); + @SuppressWarnings("unchecked") + private final ISerializerDeserializer<ADateTime> datetimeSerDer = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME); + private final boolean parseJson; + private final boolean decimalToDouble; + private final boolean timestampAsLong; + private final boolean dateAsInt; + + private final String timeZoneId; + private final int timeZoneOffset; + private final AMutableDate mutableDate = new AMutableDate(0); + private final AMutableTime mutableTime = new AMutableTime(0); + private final AMutableDateTime mutableDateTime = new AMutableDateTime(0); + + public DeltaConverterContext(Map<String, String> configuration) { + parseJson = Boolean.parseBoolean( + configuration.getOrDefault(ExternalDataConstants.DeltaOptions.PARSE_JSON_STRING, "false")); + decimalToDouble = Boolean.parseBoolean( + configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, "false")); + timestampAsLong = Boolean + .parseBoolean(configuration.getOrDefault(ExternalDataConstants.DeltaOptions.TIMESTAMP_AS_LONG, "true")); + dateAsInt = Boolean + .parseBoolean(configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DATE_AS_INT, "true")); + String configuredTimeZoneId = configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE); + if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) { + timeZoneId = configuredTimeZoneId; + timeZoneOffset = TimeZone.getTimeZone(timeZoneId).getRawOffset(); + } else { + timeZoneId = ""; + timeZoneOffset = 0; + } + } + + public void serializeDate(int value, DataOutput output) { + try { + mutableDate.setValue(value); + dateSerDer.serialize(mutableDate, output); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } + + public void serializeTime(int value, DataOutput output) { + try { + mutableTime.setValue(value); + timeSerDer.serialize(mutableTime, output); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } + + public void serializeDateTime(long timestamp, DataOutput output) { + try { + mutableDateTime.setValue(timestamp); + datetimeSerDer.serialize(mutableDateTime, output); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } + + public boolean isParseJsonEnabled() { + return parseJson; + } + + public boolean isDecimalToDoubleEnabled() { + return decimalToDouble; + } + + public String getTimeZoneId() { + return timeZoneId; + } + + public int getTimeZoneOffset() { + return timeZoneOffset; + } + + public boolean isTimestampAsLong() { + return timestampAsLong; + } + + public boolean isDateAsInt() { + return dateAsInt; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java index e56be86..9871560 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java @@ -18,13 +18,12 @@ */ package org.apache.asterix.external.parser; -import static org.apache.avro.Schema.Type.NULL; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.DataOutput; import java.io.IOException; import java.math.BigDecimal; -import java.util.concurrent.TimeUnit; +import java.util.Map; import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.IAsterixListBuilder; @@ -34,7 +33,9 @@ import org.apache.asterix.external.api.IRawRecord; import org.apache.asterix.external.api.IRecordDataParser; import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; -import org.apache.asterix.external.parser.jackson.ParserContext; +import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext; +import org.apache.asterix.external.input.record.reader.hdfs.parquet.AsterixParquetRuntimeException; +import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.om.base.ABoolean; import org.apache.asterix.om.base.ANull; import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; @@ -63,11 +64,11 @@ import io.delta.kernel.types.TimestampType; public class DeltaDataParser extends AbstractDataParser implements IRecordDataParser<Row> { - private final ParserContext parserContext; + private final DeltaConverterContext parserContext; private final IExternalFilterValueEmbedder valueEmbedder; - public DeltaDataParser(IExternalDataRuntimeContext context) { - parserContext = new ParserContext(); + public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) { + parserContext = new DeltaConverterContext(conf); valueEmbedder = context.getValueEmbedder(); } @@ -160,7 +161,6 @@ if (isNull) { return ATypeTag.NULL; } - if (schema instanceof BooleanType) { return ATypeTag.BOOLEAN; } else if (schema instanceof ShortType || schema instanceof IntegerType || schema instanceof LongType) { @@ -170,15 +170,24 @@ } else if (schema instanceof StringType) { return ATypeTag.STRING; } else if (schema instanceof DateType) { - return ATypeTag.BIGINT; + if (parserContext.isDateAsInt()) { + return ATypeTag.INTEGER; + } + return ATypeTag.DATE; } else if (schema instanceof TimestampType || schema instanceof TimestampNTZType) { - return ATypeTag.BIGINT; + if (parserContext.isTimestampAsLong()) { + return ATypeTag.BIGINT; + } + return ATypeTag.DATETIME; } else if (schema instanceof BinaryType) { return ATypeTag.BINARY; } else if (schema instanceof ArrayType) { return ATypeTag.ARRAY; } else if (schema instanceof StructType) { return ATypeTag.OBJECT; + } else if (schema instanceof DecimalType) { + ensureDecimalToDoubleEnabled(schema, parserContext); + return ATypeTag.DOUBLE; } else { throw createUnsupportedException(schema); } @@ -204,11 +213,22 @@ } else if (schema instanceof StringType) { serializeString(row.getString(index), out); } else if (schema instanceof DateType) { - serializeDate(row.getInt(index), out); + if (parserContext.isDateAsInt()) { + serializeLong(row.getInt(index), out); + } + parserContext.serializeDate(row.getInt(index), out); } else if (schema instanceof TimestampType) { - serializeTimestamp(row.getLong(index), out); + long timeStamp = row.getLong(index); + int offset = parserContext.getTimeZoneOffset(); + if (parserContext.isTimestampAsLong()) { + serializeLong(timeStamp + offset, out); + } + parserContext.serializeDateTime(timeStamp + offset, out); } else if (schema instanceof TimestampNTZType) { - serializeTimestamp(row.getLong(index), out); + if (parserContext.isTimestampAsLong()) { + serializeLong(row.getLong(index), out); + } + parserContext.serializeDateTime(row.getLong(index), out); } else if (schema instanceof StructType) { parseObject(row.getStruct(index), out); } else if (schema instanceof ArrayType) { @@ -240,11 +260,22 @@ } else if (schema instanceof StringType) { serializeString(column.getString(index), out); } else if (schema instanceof DateType) { - serializeDate(column.getInt(index), out); + if (parserContext.isDateAsInt()) { + serializeLong(column.getInt(index), out); + } + parserContext.serializeDate(column.getInt(index), out); } else if (schema instanceof TimestampType) { - serializeTimestamp(column.getLong(index), out); + long timeStamp = column.getLong(index); + int offset = parserContext.getTimeZoneOffset(); + if (parserContext.isTimestampAsLong()) { + serializeLong(timeStamp + offset, out); + } + parserContext.serializeDateTime(timeStamp + offset, out); } else if (schema instanceof TimestampNTZType) { - serializeTimestamp(column.getLong(index), out); + if (parserContext.isTimestampAsLong()) { + serializeLong(column.getLong(index), out); + } + parserContext.serializeDateTime(column.getLong(index), out); } else if (schema instanceof ArrayType) { parseArray((ArrayType) schema, column.getArray(index), out); } else if (schema instanceof StructType) { @@ -273,16 +304,6 @@ stringSerde.serialize(aString, out); } - private void serializeDate(Object value, DataOutput out) throws HyracksDataException { - aInt32.setValue((Integer) value); - int32Serde.serialize(aInt32, out); - } - - private void serializeTimestamp(Object value, DataOutput out) throws HyracksDataException { - aInt64.setValue(TimeUnit.MICROSECONDS.toMillis((Long) value)); - int64Serde.serialize(aInt64, out); - } - private void serializeDecimal(BigDecimal value, DataOutput out) throws HyracksDataException { serializeDouble(value.doubleValue(), out); } @@ -290,4 +311,12 @@ private static HyracksDataException createUnsupportedException(DataType schema) { return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Delta Parser", schema.toString()); } + + private static void ensureDecimalToDoubleEnabled(DataType type, DeltaConverterContext context) { + if (!context.isDecimalToDoubleEnabled()) { + throw new AsterixParquetRuntimeException( + new RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, type.toString(), + ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE)); + } + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java index 5d4b2dd..95868c4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java @@ -60,7 +60,6 @@ } private DeltaDataParser createParser(IExternalDataRuntimeContext context) { - return new DeltaDataParser(context); + return new DeltaDataParser(context, this.configuration); } - } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index d8f89c2..23f627e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -328,6 +328,20 @@ WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP); } + public static class DeltaOptions { + private DeltaOptions() { + + } + + public static final String PARSE_JSON_STRING = "parse-json-string"; + public static final String DECIMAL_TO_DOUBLE = "decimal-to-double"; + public static final String TIMESTAMP_AS_LONG = "decimal-to-double"; + public static final String DATE_AS_INT = "decimal-to-double"; + + public static final String TIMEZONE = "timezone"; + public static final Set<String> VALID_TIME_ZONES = Set.of(TimeZone.getAvailableIDs()); + } + public static class ParquetOptions { private ParquetOptions() { } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: goldfish Gerrit-Change-Id: I6fa3d46588116508716fc1abd693f75ee5538d7f Gerrit-Change-Number: 19068 Gerrit-PatchSet: 1 Gerrit-Owner: Ayush Tripathi <[email protected]> Gerrit-MessageType: newchange
