>From Ayush Tripathi <[email protected]>:

Ayush Tripathi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19175 )


Change subject: [ASTERIXDB-3365][EXT] Revisiting Avro logical types
......................................................................

[ASTERIXDB-3365][EXT] Revisiting Avro logical types

- user model changes: yes
- storage format changes: no
- interface changes: yes

Details: Adding optional parameters to parse avro logical types.
- DECIMAL_TO_DOUBLE: Converts decimal values to double. (Default: false)
- UUID_AS_STRING:  Converts UUID values to string representation. (Default: 
true)
- DATE_AS_INT: Converts date values to integers. (Default: true)
- TIME_AS_LONG: Converts time values to long. (Default: true)
- TIMESTAMP_AS_LONG: Convert timestamp to long (default: true)

Change-Id: I6ce5d7f7c3deac986abd416583133475d1f86f27
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
13 files changed, 559 insertions(+), 7 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/75/19175/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index c50b391..7c01957 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -420,6 +420,7 @@
         loadData(generatedDataBasePath, "", "avro_type.avro", definition, 
definitionSegment, false, false);
         loadData(generatedDataBasePath, "", "partition_heterogeneous.avro", 
definition, definitionSegment, false,
                 false);
+        loadData(generatedDataBasePath, "", "avro_logical_type.avro", 
definition, definitionSegment, false, false);
 
         Collection<File> files =
                 IoUtil.getMatchingFiles(Paths.get(generatedDataBasePath + 
"/external-filter"), AVRO_FILTER);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
index db12e7b..4c13052 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
@@ -107,5 +107,6 @@
             writeAvroFile(jsonFile, outputPath);
         }
         AvroFileExampleGeneratorUtil.writeExample();
+        AvroLogicalTypesExampleGenerator.writeLogicalTypesExample();
     }
 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
new file mode 100644
index 0000000..0d02d3c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.external_dataset.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.Test;
+
+public class AvroLogicalTypesExampleGenerator {
+    private static final String SCHEMA_STRING = "{\n" + "  \"type\": 
\"record\",\n"
+            + "  \"name\": \"LogicalTypesRecord\",\n" + "  \"namespace\": 
\"com.example\",\n" + "  \"fields\": [\n"
+            + "    { \"name\": \"decimalField\", \"type\": { \"type\": 
\"bytes\", \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 2 } },\n"
+            + "    { \"name\": \"uuidField\", \"type\": { \"type\": 
\"string\", \"logicalType\": \"uuid\" } },\n"
+            + "    { \"name\": \"dateField\", \"type\": { \"type\": \"int\", 
\"logicalType\": \"date\" } },\n"
+            + "    { \"name\": \"timeMillisField\", \"type\": { \"type\": 
\"int\", \"logicalType\": \"time-millis\" } },\n"
+            + "    { \"name\": \"timeMicrosField\", \"type\": { \"type\": 
\"long\", \"logicalType\": \"time-micros\" } },\n"
+            + "    { \"name\": \"timestampMillisField\", \"type\": { \"type\": 
\"long\", \"logicalType\": \"timestamp-millis\" } },\n"
+            + "    { \"name\": \"timestampMicrosField\", \"type\": { \"type\": 
\"long\", \"logicalType\": \"timestamp-micros\" } },\n"
+            + "    { \"name\": \"localTimestampMillisField\", \"type\": { 
\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\" } },\n"
+            + "    { \"name\": \"localTimestampMicrosField\", \"type\": { 
\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\" } }\n"
+            + "  ]\n" + "}";
+
+    private static final String AVRO_GEN_BASEDIR = 
"target/generated_avro_files";
+    private static final String FILE_NAME = "avro_logical_type.avro";
+
+    public static void writeLogicalTypesExample() throws IOException {
+        Schema schema = new Schema.Parser().parse(SCHEMA_STRING);
+        File destPath = new File(AVRO_GEN_BASEDIR);
+        if (!destPath.exists()) {
+            destPath.mkdirs();
+        }
+        File outputFile = new File(destPath, FILE_NAME);
+
+        DatumWriter<GenericRecord> datumWriter = new 
SpecificDatumWriter<>(schema);
+        try (DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
+            dataFileWriter.create(schema, outputFile);
+
+            // First record
+            GenericRecord record = new GenericData.Record(schema);
+            record.put("decimalField", ByteBuffer.wrap(new byte[] { 0x01, 0x02 
}));
+            record.put("uuidField", "123e4567-e89b-12d3-a456-426614174000");
+            record.put("dateField", (int) (Instant.now().getEpochSecond() / 
(24 * 60 * 60)));
+            record.put("timeMillisField", 12345678);
+            record.put("timeMicrosField", 12345678901234L);
+            record.put("timestampMillisField", 1733344079083L);
+            record.put("timestampMicrosField", 1733344079083000L);
+            record.put("localTimestampMillisField", 1733344079083L);
+            record.put("localTimestampMicrosField", 1733344079083000L);
+
+            dataFileWriter.append(record);
+        } catch (IOException e) {
+            System.err.println("Failed to write AVRO file: " + e.getMessage());
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void main() throws IOException {
+        writeLogicalTypesExample();
+    }
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
new file mode 100644
index 0000000..73d8056
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE AvroType as {
+};
+
+CREATE EXTERNAL DATASET AvroDataset(AvroType) USING %adapter%
+(
+  %template%,
+  ("container"="playground"),
+  ("definition"="avro-data/reviews"),
+  ("include"="*avro_logical_type.avro"),
+  ("decimal-to-double"="true"),
+  ("uuid-to-string"="true"),
+  ("date-to-int"="true"),
+  ("time-to-long"="true"),
+  ("timestamp-to-long"="true"),
+  ("format" = "avro")
+);
+
+CREATE EXTERNAL DATASET AvroDataset2(AvroType) USING %adapter%
+(
+  %template%,
+  ("container"="playground"),
+  ("definition"="avro-data/reviews"),
+  ("include"="*avro_logical_type.avro"),
+  ("decimal-to-double"="true"),
+  ("uuid-to-string"="false"),
+  ("date-to-int"="false"),
+  ("time-to-long"="false"),
+  ("timestamp-to-long"="false"),
+  ("format" = "avro")
+);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
new file mode 100644
index 0000000..8a72adb
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE a
+FROM AvroDataset a;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
new file mode 100644
index 0000000..b927aba
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE a
+FROM AvroDataset2 a;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
new file mode 100644
index 0000000..7690b33
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
@@ -0,0 +1 @@
+{ "decimalField": 2.58, "uuidField": "123e4567-e89b-12d3-a456-426614174000", 
"dateField": 20061, "timeMillisField": 12345678, "timeMicrosField": -539222987, 
"timestampMillisField": 1733344079083, "timestampMicrosField": 1733344079083, 
"localTimestampMillisField": 1733344079083, "localTimestampMicrosField": 
1733344079083 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
new file mode 100644
index 0000000..51eedae
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
@@ -0,0 +1 @@
+{ "decimalField": 2.58, "uuidField": 
uuid("123e4567-e89b-12d3-a456-426614174000"), "dateField": date("2024-12-04"), 
"timeMillisField": time("03:25:45.678"), "timeMicrosField": 
time("18:12:57.013"), "timestampMillisField": 
datetime("2024-12-04T20:27:59.083"), "timestampMicrosField": 
datetime("2024-12-04T20:27:59.083"), "localTimestampMillisField": 
datetime("2024-12-04T20:27:59.083"), "localTimestampMicrosField": 
datetime("2024-12-04T20:27:59.083") }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index d840527..5fa6f46 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -353,6 +353,12 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
+      <compilation-unit name="common/avro/avro-logical-types">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">common/avro/avro-logical-types</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
       <compilation-unit name="common/avro/avro-types/avro-nested-records">
         <placeholder name="adapter" value="S3" />
         <output-dir 
compare="Text">common/avro/avro-types/avro-nested-records</output-dir>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
new file mode 100644
index 0000000..3afc4ee
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableTime;
+import org.apache.asterix.om.base.ATime;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.parquet.io.api.Binary;
+
+public class AvroConverterContext extends ParserContext {
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADate> dateSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+    private final ISerializerDeserializer<ATime> timeSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ATIME);
+    private final ISerializerDeserializer<ADouble> doubleSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+    private final boolean decimalToDouble;
+    private final boolean timestampAsLong;
+    private final boolean dateAsInt;
+    private final boolean timeAsLong;
+    private final boolean uuidAsString;
+
+    private final int timeZoneOffset;
+    private final AMutableDate mutableDate = new AMutableDate(0);
+    private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+    private final AMutableDouble mutableDouble = new AMutableDouble(0.0);
+    private final AMutableTime mutableTime = new AMutableTime(0);
+    private final List<Warning> warnings;
+
+    public AvroConverterContext(Map<String, String> configuration, 
List<Warning> warnings) {
+        this.warnings = warnings;
+        decimalToDouble = Boolean.parseBoolean(configuration
+                
.getOrDefault(ExternalDataConstants.AvroOptions.DECIMAL_TO_DOUBLE, 
ExternalDataConstants.FALSE));
+        timestampAsLong = Boolean.parseBoolean(configuration
+                
.getOrDefault(ExternalDataConstants.AvroOptions.TIMESTAMP_AS_LONG, 
ExternalDataConstants.TRUE));
+        dateAsInt = Boolean.parseBoolean(
+                
configuration.getOrDefault(ExternalDataConstants.AvroOptions.DATE_AS_INT, 
ExternalDataConstants.TRUE));
+        timeAsLong = Boolean.parseBoolean(
+                
configuration.getOrDefault(ExternalDataConstants.AvroOptions.TIME_AS_LONG, 
ExternalDataConstants.TRUE));
+        uuidAsString = 
Boolean.parseBoolean(configuration.getOrDefault(ExternalDataConstants.AvroOptions.UUID_AS_STRING,
+                ExternalDataConstants.TRUE));
+        String configuredTimeZoneId = 
configuration.get(ExternalDataConstants.AvroOptions.TIMEZONE);
+        if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+            timeZoneOffset = 
TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+        } else {
+            timeZoneOffset = 0;
+        }
+    }
+
+    public void serializeDate(Object value, DataOutput output) {
+        try {
+            int intValue = (int) ((Number) value).longValue();
+            mutableDate.setValue(intValue);
+            dateSerDer.serialize(mutableDate, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeDateTime(long timestamp, DataOutput output) {
+        try {
+            mutableDateTime.setValue(timestamp);
+            datetimeSerDer.serialize(mutableDateTime, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeTime(int value, DataOutput output) {
+        try {
+            mutableTime.setValue(value);
+            timeSerDer.serialize(mutableTime, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeDecimal(Object value, DataOutput output, int scale) 
throws IOException {
+        if (value instanceof ByteBuffer) {
+            ByteBuffer byteBuffer = (ByteBuffer) value;
+            byte[] bytes = new byte[byteBuffer.remaining()];
+            byteBuffer.get(bytes);
+            BigInteger unscaledValue = new BigInteger(bytes);
+            BigDecimal bigDecimal = new BigDecimal(unscaledValue, scale);
+            serializeDouble(bigDecimal.doubleValue(), output);
+        } else {
+            throw new IOException(
+                    "Expected ByteBuffer for Decimal logical type, but got: " 
+ value.getClass().getName());
+        }
+    }
+
+    public void serializeDouble(double value, DataOutput output) {
+        try {
+            mutableDouble.setValue(value);
+            doubleSerDer.serialize(mutableDouble, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeUUID(Object value, DataOutput output) {
+        String uuidValue = value.toString();
+        UUID uuid = UUID.fromString(uuidValue);
+        byte[] uuidBytes = new byte[16];
+        long mostSigBits = uuid.getMostSignificantBits();
+        long leastSigBits = uuid.getLeastSignificantBits();
+        for (int i = 0; i < 8; i++) {
+            uuidBytes[i] = (byte) (mostSigBits >>> (8 * (7 - i)));
+        }
+        for (int i = 0; i < 8; i++) {
+            uuidBytes[8 + i] = (byte) (leastSigBits >>> (8 * (7 - i)));
+        }
+        Binary binaryUuid = Binary.fromReusedByteArray(uuidBytes);
+        try {
+            output.writeByte(ATypeTag.UUID.serialize());
+            binaryUuid.writeTo(output);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public boolean isDecimalToDoubleEnabled() {
+        return decimalToDouble;
+    }
+
+    public int getTimeZoneOffset() {
+        return timeZoneOffset;
+    }
+
+    public boolean isTimestampAsLong() {
+        return timestampAsLong;
+    }
+
+    public boolean isDateAsInt() {
+        return dateAsInt;
+    }
+
+    public boolean isUuidAsString() {
+        return uuidAsString;
+    }
+
+    public boolean isTimeAsLong() {
+        return timeAsLong;
+    }
+
+    public List<Warning> getWarnings() {
+        return warnings;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index c3563cc..deec7a8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -24,9 +24,11 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;

 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.IAsterixListBuilder;
@@ -36,26 +38,31 @@
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import 
org.apache.asterix.external.input.record.reader.stream.AvroConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
 import org.apache.hyracks.data.std.api.IValueReference;

 public class AvroDataParser extends AbstractDataParser implements 
IRecordDataParser<GenericRecord> {
-    private final ParserContext parserContext;
+    private final AvroConverterContext parserContext;
     private final IExternalFilterValueEmbedder valueEmbedder;

-    public AvroDataParser(IExternalDataRuntimeContext context) {
-        parserContext = new ParserContext();
+    public AvroDataParser(IExternalDataRuntimeContext context, Map<String, 
String> conf) {
+        List<Warning> warnings = new ArrayList<>();
+        parserContext = new AvroConverterContext(conf, warnings);
         valueEmbedder = context.getValueEmbedder();
     }

@@ -154,8 +161,9 @@
         List<Schema> possibleTypes = unionSchema.getTypes();
         for (Schema possibleType : possibleTypes) {
             Schema.Type schemaType = possibleType.getType();
+            LogicalType logicalType = possibleType.getLogicalType();
             if (schemaType != NULL) {
-                if (matchesType(value, schemaType)) {
+                if (matchesType(value, schemaType, logicalType)) {
                     return possibleType;
                 }
             }
@@ -163,7 +171,7 @@
         return null;
     }

-    private boolean matchesType(Object value, Schema.Type schemaType) {
+    private boolean matchesType(Object value, Schema.Type schemaType, 
LogicalType logicalType) {
         switch (schemaType) {
             case INT:
                 return value instanceof Integer;
@@ -192,6 +200,41 @@

     private ATypeTag getTypeTag(Schema schema, Object value) throws 
HyracksDataException {
         Schema.Type schemaType = schema.getType();
+        LogicalType logicalType = schema.getLogicalType();
+        if (logicalType instanceof LogicalTypes.Uuid) {
+            if (parserContext.isUuidAsString()) {
+                return ATypeTag.STRING;
+            }
+            return ATypeTag.UUID;
+        }
+        if (logicalType instanceof LogicalTypes.Decimal) {
+            ensureDecimalToDoubleEnabled(logicalType, parserContext);
+            return ATypeTag.DOUBLE;
+        } else if (logicalType instanceof LogicalTypes.Date) {
+            if (parserContext.isDateAsInt()) {
+                return ATypeTag.INTEGER;
+            }
+            return ATypeTag.DATE;
+        } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+            if (parserContext.isTimeAsLong()) {
+                return ATypeTag.BIGINT;
+            }
+            return ATypeTag.TIME;
+        } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+            if (parserContext.isTimeAsLong()) {
+                return ATypeTag.INTEGER;
+            }
+            return ATypeTag.TIME;
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros
+                || logicalType instanceof LogicalTypes.TimestampMillis
+                || logicalType instanceof LogicalTypes.LocalTimestampMicros
+                || logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+            if (parserContext.isTimestampAsLong()) {
+                return ATypeTag.BIGINT;
+            }
+            return ATypeTag.DATETIME;
+        }
+
         if (value == null) {
             // The 'value' is missing
             return ATypeTag.MISSING;
@@ -228,8 +271,73 @@
         }
     }

+    private void parseLogicalValue(LogicalType logicalType, Object value, 
DataOutput out) throws IOException {
+        if (logicalType instanceof LogicalTypes.Uuid) {
+            if (parserContext.isUuidAsString()) {
+                serializeString(value, out);
+            } else {
+                parserContext.serializeUUID(value, out);
+            }
+        }
+        if (logicalType instanceof LogicalTypes.Decimal) {
+            LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
logicalType;
+            int scale = decimalType.getScale();
+            parserContext.serializeDecimal(value, out, scale);
+        } else if (logicalType instanceof LogicalTypes.Date) {
+            if (parserContext.isDateAsInt()) {
+                serializeLong(value, out);
+            } else {
+                parserContext.serializeDate(value, out);
+            }
+        } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+            int timeInMillis = (int) TimeUnit.MICROSECONDS.toMillis(((Number) 
value).longValue());
+            int offset = parserContext.getTimeZoneOffset();
+            timeInMillis = timeInMillis + offset;
+            if (parserContext.isTimeAsLong()) {
+                serializeInt(timeInMillis, out);
+            } else {
+                parserContext.serializeTime(timeInMillis + offset, out);
+            }
+        } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+            int timeInMillis = ((Number) value).intValue();
+            int offset = parserContext.getTimeZoneOffset();
+            timeInMillis = timeInMillis + offset;
+            if (parserContext.isTimeAsLong()) {
+                serializeLong(timeInMillis, out);
+            } else {
+                parserContext.serializeTime(timeInMillis, out);
+            }
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros
+                || logicalType instanceof LogicalTypes.LocalTimestampMicros) {
+            long timeStampInMicros = ((Number) value).longValue();
+            int offset = parserContext.getTimeZoneOffset();
+            long timeStampInMillis = 
TimeUnit.MICROSECONDS.toMillis(timeStampInMicros);
+            timeStampInMillis = timeStampInMillis + offset;
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis, out);
+            }
+        } else if (logicalType instanceof LogicalTypes.TimestampMillis
+                || logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+            long timeStampInMillis = ((Number) value).longValue();
+            int offset = parserContext.getTimeZoneOffset();
+            timeStampInMillis = timeStampInMillis + offset;
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis, out);
+            }
+        }
+    }
+
     private void parseValue(Schema schema, Object value, DataOutput out) 
throws IOException {
         Schema.Type type = schema.getType();
+        LogicalType logicalType = schema.getLogicalType();
+        if (logicalType != null) {
+            parseLogicalValue(logicalType, value, out);
+            return;
+        }
         switch (type) {
             case RECORD:
                 parseObject((GenericRecord) value, out);
@@ -279,6 +387,12 @@
         int64Serde.serialize(aInt64, out);
     }

+    private void serializeInt(Object value, DataOutput out) throws 
HyracksDataException {
+        int intValue = ((Number) value).intValue();
+        aInt64.setValue(intValue);
+        int64Serde.serialize(aInt64, out);
+    }
+
     private void serializeDouble(Object value, DataOutput out) throws 
HyracksDataException {
         double doubleValue = ((Number) value).doubleValue();
         aDouble.setValue(doubleValue);
@@ -290,6 +404,14 @@
         stringSerde.serialize(aString, out);
     }

+    private static void ensureDecimalToDoubleEnabled(LogicalType type, 
AvroConverterContext context)
+            throws RuntimeDataException {
+        if (!context.isDecimalToDoubleEnabled()) {
+            throw new 
RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, 
type.toString(),
+                    ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
+        }
+    }
+
     private static HyracksDataException createUnsupportedException(Schema 
schema) {
         return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Avro 
Parser", schema);
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
index 90fcc1a..c178401 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
@@ -59,7 +59,7 @@
     }

     private AvroDataParser createParser(IExternalDataRuntimeContext context) {
-        return new AvroDataParser(context);
+        return new AvroDataParser(context, configuration);
     }

 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index ffe75cb..e7a4ae0 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -327,6 +327,23 @@
         WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
     }

+    public static class AvroOptions {
+        private AvroOptions() {
+        }
+
+        // - DECIMAL_TO_DOUBLE: Convert decimal to double (default: false)
+        // - UUID_AS_STRING: Convert UUID to string (default: true)
+        // - DATE_AS_INT: Convert date to integer (default: true)
+        // - TIME_AS_LONG: Convert time to long (default: true)
+        // - TIMESTAMP_AS_LONG: Convert timestamp to long (default: true)
+        public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+        public static final String UUID_AS_STRING = "uuid-to-string";
+        public static final String DATE_AS_INT = "date-to-int";
+        public static final String TIMEZONE = "timezone";
+        public static final String TIME_AS_LONG = "time-to-long";
+        public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+    }
+
     public static class DeltaOptions {
         private DeltaOptions() {
         }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19175
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: goldfish
Gerrit-Change-Id: I6ce5d7f7c3deac986abd416583133475d1f86f27
Gerrit-Change-Number: 19175
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-MessageType: newchange

Reply via email to