>From Ayush Tripathi <[email protected]>:

Ayush Tripathi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068 )


Change subject: Making decimal-to-double,timestamp processing conversions
......................................................................

Making decimal-to-double,timestamp processing conversions

Change-Id: I6fa3d46588116508716fc1abd693f75ee5538d7f
---
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
4 files changed, 200 insertions(+), 27 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/68/19068/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
new file mode 100644
index 0000000..f105297
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta.converter;
+
+import java.io.DataOutput;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.*;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DeltaConverterContext extends ParserContext {
+    private final ISerializerDeserializer<ADate> dateSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ATime> timeSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ATIME);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+    private final boolean parseJson;
+    private final boolean decimalToDouble;
+    private final boolean timestampAsLong;
+    private final boolean dateAsInt;
+
+    private final String timeZoneId;
+    private final int timeZoneOffset;
+    private final AMutableDate mutableDate = new AMutableDate(0);
+    private final AMutableTime mutableTime = new AMutableTime(0);
+    private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+
+    public DeltaConverterContext(Map<String, String> configuration) {
+        parseJson = Boolean.parseBoolean(
+                
configuration.getOrDefault(ExternalDataConstants.DeltaOptions.PARSE_JSON_STRING,
 "false"));
+        decimalToDouble = Boolean.parseBoolean(
+                
configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE,
 "false"));
+        timestampAsLong = Boolean
+                
.parseBoolean(configuration.getOrDefault(ExternalDataConstants.DeltaOptions.TIMESTAMP_AS_LONG,
 "true"));
+        dateAsInt = Boolean
+                
.parseBoolean(configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DATE_AS_INT,
 "true"));
+        String configuredTimeZoneId = 
configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE);
+        if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+            timeZoneId = configuredTimeZoneId;
+            timeZoneOffset = TimeZone.getTimeZone(timeZoneId).getRawOffset();
+        } else {
+            timeZoneId = "";
+            timeZoneOffset = 0;
+        }
+    }
+
+    public void serializeDate(int value, DataOutput output) {
+        try {
+            mutableDate.setValue(value);
+            dateSerDer.serialize(mutableDate, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeTime(int value, DataOutput output) {
+        try {
+            mutableTime.setValue(value);
+            timeSerDer.serialize(mutableTime, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeDateTime(long timestamp, DataOutput output) {
+        try {
+            mutableDateTime.setValue(timestamp);
+            datetimeSerDer.serialize(mutableDateTime, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public boolean isParseJsonEnabled() {
+        return parseJson;
+    }
+
+    public boolean isDecimalToDoubleEnabled() {
+        return decimalToDouble;
+    }
+
+    public String getTimeZoneId() {
+        return timeZoneId;
+    }
+
+    public int getTimeZoneOffset() {
+        return timeZoneOffset;
+    }
+
+    public boolean isTimestampAsLong() {
+        return timestampAsLong;
+    }
+
+    public boolean isDateAsInt() {
+        return dateAsInt;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
index e56be86..9871560 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -18,13 +18,12 @@
  */
 package org.apache.asterix.external.parser;

-import static org.apache.avro.Schema.Type.NULL;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;

 import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;

 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.IAsterixListBuilder;
@@ -34,7 +33,9 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import 
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.AsterixParquetRuntimeException;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
@@ -63,11 +64,11 @@
 import io.delta.kernel.types.TimestampType;

 public class DeltaDataParser extends AbstractDataParser implements 
IRecordDataParser<Row> {
-    private final ParserContext parserContext;
+    private final DeltaConverterContext parserContext;
     private final IExternalFilterValueEmbedder valueEmbedder;

-    public DeltaDataParser(IExternalDataRuntimeContext context) {
-        parserContext = new ParserContext();
+    public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, 
String> conf) {
+        parserContext = new DeltaConverterContext(conf);
         valueEmbedder = context.getValueEmbedder();
     }

@@ -160,7 +161,6 @@
         if (isNull) {
             return ATypeTag.NULL;
         }
-
         if (schema instanceof BooleanType) {
             return ATypeTag.BOOLEAN;
         } else if (schema instanceof ShortType || schema instanceof 
IntegerType || schema instanceof LongType) {
@@ -170,15 +170,24 @@
         } else if (schema instanceof StringType) {
             return ATypeTag.STRING;
         } else if (schema instanceof DateType) {
-            return ATypeTag.BIGINT;
+            if (parserContext.isDateAsInt()) {
+                return ATypeTag.INTEGER;
+            }
+            return ATypeTag.DATE;
         } else if (schema instanceof TimestampType || schema instanceof 
TimestampNTZType) {
-            return ATypeTag.BIGINT;
+            if (parserContext.isTimestampAsLong()) {
+                return ATypeTag.BIGINT;
+            }
+            return ATypeTag.DATETIME;
         } else if (schema instanceof BinaryType) {
             return ATypeTag.BINARY;
         } else if (schema instanceof ArrayType) {
             return ATypeTag.ARRAY;
         } else if (schema instanceof StructType) {
             return ATypeTag.OBJECT;
+        } else if (schema instanceof DecimalType) {
+            ensureDecimalToDoubleEnabled(schema, parserContext);
+            return ATypeTag.DOUBLE;
         } else {
             throw createUnsupportedException(schema);
         }
@@ -204,11 +213,22 @@
         } else if (schema instanceof StringType) {
             serializeString(row.getString(index), out);
         } else if (schema instanceof DateType) {
-            serializeDate(row.getInt(index), out);
+            if (parserContext.isDateAsInt()) {
+                serializeLong(row.getInt(index), out);
+            }
+            parserContext.serializeDate(row.getInt(index), out);
         } else if (schema instanceof TimestampType) {
-            serializeTimestamp(row.getLong(index), out);
+            long timeStamp = row.getLong(index);
+            int offset = parserContext.getTimeZoneOffset();
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStamp + offset, out);
+            }
+            parserContext.serializeDateTime(timeStamp + offset, out);
         } else if (schema instanceof TimestampNTZType) {
-            serializeTimestamp(row.getLong(index), out);
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(row.getLong(index), out);
+            }
+            parserContext.serializeDateTime(row.getLong(index), out);
         } else if (schema instanceof StructType) {
             parseObject(row.getStruct(index), out);
         } else if (schema instanceof ArrayType) {
@@ -240,11 +260,22 @@
         } else if (schema instanceof StringType) {
             serializeString(column.getString(index), out);
         } else if (schema instanceof DateType) {
-            serializeDate(column.getInt(index), out);
+            if (parserContext.isDateAsInt()) {
+                serializeLong(column.getInt(index), out);
+            }
+            parserContext.serializeDate(column.getInt(index), out);
         } else if (schema instanceof TimestampType) {
-            serializeTimestamp(column.getLong(index), out);
+            long timeStamp = column.getLong(index);
+            int offset = parserContext.getTimeZoneOffset();
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStamp + offset, out);
+            }
+            parserContext.serializeDateTime(timeStamp + offset, out);
         } else if (schema instanceof TimestampNTZType) {
-            serializeTimestamp(column.getLong(index), out);
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(column.getLong(index), out);
+            }
+            parserContext.serializeDateTime(column.getLong(index), out);
         } else if (schema instanceof ArrayType) {
             parseArray((ArrayType) schema, column.getArray(index), out);
         } else if (schema instanceof StructType) {
@@ -273,16 +304,6 @@
         stringSerde.serialize(aString, out);
     }

-    private void serializeDate(Object value, DataOutput out) throws 
HyracksDataException {
-        aInt32.setValue((Integer) value);
-        int32Serde.serialize(aInt32, out);
-    }
-
-    private void serializeTimestamp(Object value, DataOutput out) throws 
HyracksDataException {
-        aInt64.setValue(TimeUnit.MICROSECONDS.toMillis((Long) value));
-        int64Serde.serialize(aInt64, out);
-    }
-
     private void serializeDecimal(BigDecimal value, DataOutput out) throws 
HyracksDataException {
         serializeDouble(value.doubleValue(), out);
     }
@@ -290,4 +311,12 @@
     private static HyracksDataException createUnsupportedException(DataType 
schema) {
         return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Delta 
Parser", schema.toString());
     }
+
+    private static void ensureDecimalToDoubleEnabled(DataType type, 
DeltaConverterContext context) {
+        if (!context.isDecimalToDoubleEnabled()) {
+            throw new AsterixParquetRuntimeException(
+                    new 
RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, 
type.toString(),
+                            
ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE));
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
index 5d4b2dd..95868c4 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
@@ -60,7 +60,6 @@
     }

     private DeltaDataParser createParser(IExternalDataRuntimeContext context) {
-        return new DeltaDataParser(context);
+        return new DeltaDataParser(context, this.configuration);
     }
-
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index d8f89c2..23f627e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -328,6 +328,20 @@
         WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
     }

+    public static class DeltaOptions {
+        private DeltaOptions() {
+
+        }
+
+        public static final String PARSE_JSON_STRING = "parse-json-string";
+        public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+        public static final String TIMESTAMP_AS_LONG = "decimal-to-double";
+        public static final String DATE_AS_INT = "decimal-to-double";
+
+        public static final String TIMEZONE = "timezone";
+        public static final Set<String> VALID_TIME_ZONES = 
Set.of(TimeZone.getAvailableIDs());
+    }
+
     public static class ParquetOptions {
         private ParquetOptions() {
         }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: I6fa3d46588116508716fc1abd693f75ee5538d7f
Gerrit-Change-Number: 19068
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-MessageType: newchange

Reply via email to