>From Ayush Tripathi <[email protected]>:
Ayush Tripathi has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19306 )
Change subject: [ASTERIXDB-3365][EXT] Revisiting Avro logical types
......................................................................
[ASTERIXDB-3365][EXT] Revisiting Avro logical types
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details: Adding optional parameters to parse avro logical types.
- DECIMAL_TO_DOUBLE: Converts decimal values to double. (Default: false)
- UUID_AS_STRING: Converts UUID values to string representation. (Default:
true)
- DATE_AS_INT: Converts date values to integers. (Default: true)
- TIME_AS_LONG: Converts time values to long. (Default: true)
- TIMESTAMP_AS_LONG: Convert timestamp to long (default: true)
Change-Id: I6ce5d7f7c3deac986abd416583133475d1f86f27
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
A
asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
M
asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
A
asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
A
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
13 files changed, 544 insertions(+), 5 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/06/19306/1
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
index c50b391..7c01957 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java
@@ -420,6 +420,7 @@
loadData(generatedDataBasePath, "", "avro_type.avro", definition,
definitionSegment, false, false);
loadData(generatedDataBasePath, "", "partition_heterogeneous.avro",
definition, definitionSegment, false,
false);
+ loadData(generatedDataBasePath, "", "avro_logical_type.avro",
definition, definitionSegment, false, false);
Collection<File> files =
IoUtil.getMatchingFiles(Paths.get(generatedDataBasePath +
"/external-filter"), AVRO_FILTER);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
index db12e7b..4c13052 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroFileConverterUtil.java
@@ -107,5 +107,6 @@
writeAvroFile(jsonFile, outputPath);
}
AvroFileExampleGeneratorUtil.writeExample();
+ AvroLogicalTypesExampleGenerator.writeLogicalTypesExample();
}
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
new file mode 100644
index 0000000..fd54627
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/avro/AvroLogicalTypesExampleGenerator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.external_dataset.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.junit.Test;
+
+public class AvroLogicalTypesExampleGenerator {
+ private static final String SCHEMA_STRING = "{\n" + " \"type\":
\"record\",\n"
+ + " \"name\": \"LogicalTypesRecord\",\n" + " \"namespace\":
\"com.example\",\n" + " \"fields\": [\n"
+ + " { \"name\": \"decimalField\", \"type\": { \"type\":
\"bytes\", \"logicalType\": \"decimal\", \"precision\": 10, \"scale\": 2 } },\n"
+ + " { \"name\": \"uuidField\", \"type\": { \"type\":
\"string\", \"logicalType\": \"uuid\" } },\n"
+ + " { \"name\": \"dateField\", \"type\": { \"type\": \"int\",
\"logicalType\": \"date\" } },\n"
+ + " { \"name\": \"timeMillisField\", \"type\": { \"type\":
\"int\", \"logicalType\": \"time-millis\" } },\n"
+ + " { \"name\": \"timeMicrosField\", \"type\": { \"type\":
\"long\", \"logicalType\": \"time-micros\" } },\n"
+ + " { \"name\": \"timestampMillisField\", \"type\": { \"type\":
\"long\", \"logicalType\": \"timestamp-millis\" } },\n"
+ + " { \"name\": \"timestampMicrosField\", \"type\": { \"type\":
\"long\", \"logicalType\": \"timestamp-micros\" } },\n"
+ + " { \"name\": \"localTimestampMillisField\", \"type\": {
\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\" } },\n"
+ + " { \"name\": \"localTimestampMicrosField\", \"type\": {
\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\" } }\n"
+ + " ]\n" + "}";
+
+ private static final String AVRO_GEN_BASEDIR =
"target/generated_avro_files";
+ private static final String FILE_NAME = "avro_logical_type.avro";
+
+ public static void writeLogicalTypesExample() throws IOException {
+ Schema schema = new Schema.Parser().parse(SCHEMA_STRING);
+ File destPath = new File(AVRO_GEN_BASEDIR);
+ if (!destPath.exists()) {
+ destPath.mkdirs();
+ }
+ File outputFile = new File(destPath, FILE_NAME);
+
+ DatumWriter<GenericRecord> datumWriter = new
SpecificDatumWriter<>(schema);
+ try (DataFileWriter<GenericRecord> dataFileWriter = new
DataFileWriter<>(datumWriter)) {
+ dataFileWriter.create(schema, outputFile);
+
+ // First record
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("decimalField", ByteBuffer.wrap(new byte[] { 0x01, 0x02
}));
+ record.put("uuidField", "123e4567-e89b-12d3-a456-426614174000");
+ record.put("dateField", 20061);
+ record.put("timeMillisField", 12345678);
+ record.put("timeMicrosField", 12345678901234L);
+ record.put("timestampMillisField", 1733344079083L);
+ record.put("timestampMicrosField", 1733344079083000L);
+ record.put("localTimestampMillisField", 1733344079083L);
+ record.put("localTimestampMicrosField", 1733344079083000L);
+
+ dataFileWriter.append(record);
+ } catch (IOException e) {
+ System.err.println("Failed to write AVRO file: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void main() throws IOException {
+ writeLogicalTypesExample();
+ }
+}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
new file mode 100644
index 0000000..73d8056
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.01.ddl.sqlpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+
+CREATE TYPE AvroType as {
+};
+
+CREATE EXTERNAL DATASET AvroDataset(AvroType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="avro-data/reviews"),
+ ("include"="*avro_logical_type.avro"),
+ ("decimal-to-double"="true"),
+ ("uuid-to-string"="true"),
+ ("date-to-int"="true"),
+ ("time-to-long"="true"),
+ ("timestamp-to-long"="true"),
+ ("format" = "avro")
+);
+
+CREATE EXTERNAL DATASET AvroDataset2(AvroType) USING %adapter%
+(
+ %template%,
+ ("container"="playground"),
+ ("definition"="avro-data/reviews"),
+ ("include"="*avro_logical_type.avro"),
+ ("decimal-to-double"="true"),
+ ("uuid-to-string"="false"),
+ ("date-to-int"="false"),
+ ("time-to-long"="false"),
+ ("timestamp-to-long"="false"),
+ ("format" = "avro")
+);
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
new file mode 100644
index 0000000..8a72adb
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE a
+FROM AvroDataset a;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
new file mode 100644
index 0000000..b927aba
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+
+SELECT VALUE a
+FROM AvroDataset2 a;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
new file mode 100644
index 0000000..7690b33
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.02.adm
@@ -0,0 +1 @@
+{ "decimalField": 2.58, "uuidField": "123e4567-e89b-12d3-a456-426614174000",
"dateField": 20061, "timeMillisField": 12345678, "timeMicrosField": -539222987,
"timestampMillisField": 1733344079083, "timestampMicrosField": 1733344079083,
"localTimestampMillisField": 1733344079083, "localTimestampMicrosField":
1733344079083 }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
new file mode 100644
index 0000000..51eedae
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/avro/avro-logical-types/avro-logical-types.03.adm
@@ -0,0 +1 @@
+{ "decimalField": 2.58, "uuidField":
uuid("123e4567-e89b-12d3-a456-426614174000"), "dateField": date("2024-12-04"),
"timeMillisField": time("03:25:45.678"), "timeMicrosField":
time("18:12:57.013"), "timestampMillisField":
datetime("2024-12-04T20:27:59.083"), "timestampMicrosField":
datetime("2024-12-04T20:27:59.083"), "localTimestampMillisField":
datetime("2024-12-04T20:27:59.083"), "localTimestampMicrosField":
datetime("2024-12-04T20:27:59.083") }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index ff1b325..7e230da 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -564,6 +564,12 @@
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
+ <compilation-unit name="common/avro/avro-logical-types">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">common/avro/avro-logical-types</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
<compilation-unit name="common/avro/avro-types/avro-nested-records">
<placeholder name="adapter" value="S3" />
<placeholder name="path_prefix" value="" />
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
new file mode 100644
index 0000000..50a38eb
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/AvroConverterContext.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.stream;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.ADouble;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.base.AMutableDouble;
+import org.apache.asterix.om.base.AMutableTime;
+import org.apache.asterix.om.base.AMutableUUID;
+import org.apache.asterix.om.base.ATime;
+import org.apache.asterix.om.base.AUUID;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+public class AvroConverterContext extends ParserContext {
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADate> dateSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+ private final ISerializerDeserializer<ATime> timeSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ATIME);
+ private final ISerializerDeserializer<ADouble> doubleSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ protected ISerializerDeserializer<AUUID> uuidSerde =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AUUID);
+
+ private final boolean decimalToDouble;
+ private final boolean timestampAsLong;
+ private final boolean dateAsInt;
+ private final boolean timeAsLong;
+ private final boolean uuidAsString;
+
+ private final int timeZoneOffset;
+ private final AMutableDate mutableDate = new AMutableDate(0);
+ private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+ private final AMutableDouble mutableDouble = new AMutableDouble(0.0);
+ private final AMutableTime mutableTime = new AMutableTime(0);
+ protected AMutableUUID aUUID = new AMutableUUID();
+ private final List<Warning> warnings;
+
+ public AvroConverterContext(Map<String, String> configuration,
List<Warning> warnings) {
+ this.warnings = warnings;
+ decimalToDouble = Boolean.parseBoolean(configuration
+
.getOrDefault(ExternalDataConstants.AvroOptions.DECIMAL_TO_DOUBLE,
ExternalDataConstants.FALSE));
+ timestampAsLong = Boolean.parseBoolean(configuration
+
.getOrDefault(ExternalDataConstants.AvroOptions.TIMESTAMP_AS_LONG,
ExternalDataConstants.TRUE));
+ dateAsInt = Boolean.parseBoolean(
+
configuration.getOrDefault(ExternalDataConstants.AvroOptions.DATE_AS_INT,
ExternalDataConstants.TRUE));
+ timeAsLong = Boolean.parseBoolean(
+
configuration.getOrDefault(ExternalDataConstants.AvroOptions.TIME_AS_LONG,
ExternalDataConstants.TRUE));
+ uuidAsString =
Boolean.parseBoolean(configuration.getOrDefault(ExternalDataConstants.AvroOptions.UUID_AS_STRING,
+ ExternalDataConstants.TRUE));
+ String configuredTimeZoneId =
configuration.get(ExternalDataConstants.AvroOptions.TIMEZONE);
+ if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+ timeZoneOffset =
TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+ } else {
+ timeZoneOffset = 0;
+ }
+ }
+
+ public void serializeDate(Object value, DataOutput output) {
+ try {
+ int intValue = (int) ((Number) value).longValue();
+ mutableDate.setValue(intValue);
+ dateSerDer.serialize(mutableDate, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDateTime(long timestamp, DataOutput output) {
+ try {
+ mutableDateTime.setValue(timestamp);
+ datetimeSerDer.serialize(mutableDateTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeTime(int value, DataOutput output) {
+ try {
+ mutableTime.setValue(value);
+ timeSerDer.serialize(mutableTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDecimal(Object value, DataOutput output, int scale)
throws IOException {
+ if (value instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) value;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ BigInteger unscaledValue = new BigInteger(bytes);
+ BigDecimal bigDecimal = new BigDecimal(unscaledValue, scale);
+ serializeDouble(bigDecimal.doubleValue(), output);
+ } else {
+ throw new IOException(
+ "Expected ByteBuffer for Decimal logical type, but got: "
+ value.getClass().getName());
+ }
+ }
+
+ public void serializeDouble(double value, DataOutput output) {
+ try {
+ mutableDouble.setValue(value);
+ doubleSerDer.serialize(mutableDouble, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeUUID(Object value, DataOutput output) throws
HyracksDataException {
+ String uuidValue = value.toString();
+ char[] buffer = uuidValue.toCharArray();
+ aUUID.parseUUIDString(buffer, 0, uuidValue.length());
+ uuidSerde.serialize(aUUID, output);
+ }
+
+ public boolean isDecimalToDoubleEnabled() {
+ return decimalToDouble;
+ }
+
+ public int getTimeZoneOffset() {
+ return timeZoneOffset;
+ }
+
+ public boolean isTimestampAsLong() {
+ return timestampAsLong;
+ }
+
+ public boolean isDateAsInt() {
+ return dateAsInt;
+ }
+
+ public boolean isUuidAsString() {
+ return uuidAsString;
+ }
+
+ public boolean isTimeAsLong() {
+ return timeAsLong;
+ }
+
+ public List<Warning> getWarnings() {
+ return warnings;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
index c3563cc..c00d0f1 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/AvroDataParser.java
@@ -24,9 +24,11 @@
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.IAsterixListBuilder;
@@ -36,26 +38,31 @@
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import
org.apache.asterix.external.input.record.reader.stream.AvroConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IValueReference;
public class AvroDataParser extends AbstractDataParser implements
IRecordDataParser<GenericRecord> {
- private final ParserContext parserContext;
+ private final AvroConverterContext parserContext;
private final IExternalFilterValueEmbedder valueEmbedder;
- public AvroDataParser(IExternalDataRuntimeContext context) {
- parserContext = new ParserContext();
+ public AvroDataParser(IExternalDataRuntimeContext context, Map<String,
String> conf) {
+ List<Warning> warnings = new ArrayList<>();
+ parserContext = new AvroConverterContext(conf, warnings);
valueEmbedder = context.getValueEmbedder();
}
@@ -192,6 +199,41 @@
private ATypeTag getTypeTag(Schema schema, Object value) throws
HyracksDataException {
Schema.Type schemaType = schema.getType();
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType instanceof LogicalTypes.Uuid) {
+ if (parserContext.isUuidAsString()) {
+ return ATypeTag.STRING;
+ }
+ return ATypeTag.UUID;
+ }
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ ensureDecimalToDoubleEnabled(logicalType, parserContext);
+ return ATypeTag.DOUBLE;
+ } else if (logicalType instanceof LogicalTypes.Date) {
+ if (parserContext.isDateAsInt()) {
+ return ATypeTag.INTEGER;
+ }
+ return ATypeTag.DATE;
+ } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+ if (parserContext.isTimeAsLong()) {
+ return ATypeTag.BIGINT;
+ }
+ return ATypeTag.TIME;
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ if (parserContext.isTimeAsLong()) {
+ return ATypeTag.INTEGER;
+ }
+ return ATypeTag.TIME;
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros
+ || logicalType instanceof LogicalTypes.TimestampMillis
+ || logicalType instanceof LogicalTypes.LocalTimestampMicros
+ || logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+ if (parserContext.isTimestampAsLong()) {
+ return ATypeTag.BIGINT;
+ }
+ return ATypeTag.DATETIME;
+ }
+
if (value == null) {
// The 'value' is missing
return ATypeTag.MISSING;
@@ -228,8 +270,73 @@
}
}
+ private void parseLogicalValue(LogicalType logicalType, Object value,
DataOutput out) throws IOException {
+ if (logicalType instanceof LogicalTypes.Uuid) {
+ if (parserContext.isUuidAsString()) {
+ serializeString(value, out);
+ } else {
+ parserContext.serializeUUID(value, out);
+ }
+ }
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
logicalType;
+ int scale = decimalType.getScale();
+ parserContext.serializeDecimal(value, out, scale);
+ } else if (logicalType instanceof LogicalTypes.Date) {
+ if (parserContext.isDateAsInt()) {
+ serializeLong(value, out);
+ } else {
+ parserContext.serializeDate(value, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+ int timeInMillis = (int) TimeUnit.MICROSECONDS.toMillis(((Number)
value).longValue());
+ int offset = parserContext.getTimeZoneOffset();
+ timeInMillis = timeInMillis + offset;
+ if (parserContext.isTimeAsLong()) {
+ serializeInt(timeInMillis, out);
+ } else {
+ parserContext.serializeTime(timeInMillis + offset, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ int timeInMillis = ((Number) value).intValue();
+ int offset = parserContext.getTimeZoneOffset();
+ timeInMillis = timeInMillis + offset;
+ if (parserContext.isTimeAsLong()) {
+ serializeLong(timeInMillis, out);
+ } else {
+ parserContext.serializeTime(timeInMillis, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros
+ || logicalType instanceof LogicalTypes.LocalTimestampMicros) {
+ long timeStampInMicros = ((Number) value).longValue();
+ int offset = parserContext.getTimeZoneOffset();
+ long timeStampInMillis =
TimeUnit.MICROSECONDS.toMillis(timeStampInMicros);
+ timeStampInMillis = timeStampInMillis + offset;
+ if (parserContext.isTimestampAsLong()) {
+ serializeLong(timeStampInMillis, out);
+ } else {
+ parserContext.serializeDateTime(timeStampInMillis, out);
+ }
+ } else if (logicalType instanceof LogicalTypes.TimestampMillis
+ || logicalType instanceof LogicalTypes.LocalTimestampMillis) {
+ long timeStampInMillis = ((Number) value).longValue();
+ int offset = parserContext.getTimeZoneOffset();
+ timeStampInMillis = timeStampInMillis + offset;
+ if (parserContext.isTimestampAsLong()) {
+ serializeLong(timeStampInMillis, out);
+ } else {
+ parserContext.serializeDateTime(timeStampInMillis, out);
+ }
+ }
+ }
+
private void parseValue(Schema schema, Object value, DataOutput out)
throws IOException {
Schema.Type type = schema.getType();
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType != null) {
+ parseLogicalValue(logicalType, value, out);
+ return;
+ }
switch (type) {
case RECORD:
parseObject((GenericRecord) value, out);
@@ -279,6 +386,12 @@
int64Serde.serialize(aInt64, out);
}
+ private void serializeInt(Object value, DataOutput out) throws
HyracksDataException {
+ int intValue = ((Number) value).intValue();
+ aInt64.setValue(intValue);
+ int64Serde.serialize(aInt64, out);
+ }
+
private void serializeDouble(Object value, DataOutput out) throws
HyracksDataException {
double doubleValue = ((Number) value).doubleValue();
aDouble.setValue(doubleValue);
@@ -290,6 +403,14 @@
stringSerde.serialize(aString, out);
}
+ private static void ensureDecimalToDoubleEnabled(LogicalType type,
AvroConverterContext context)
+ throws RuntimeDataException {
+ if (!context.isDecimalToDoubleEnabled()) {
+ throw new
RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION,
type.toString(),
+ ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
+ }
+ }
+
private static HyracksDataException createUnsupportedException(Schema
schema) {
return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Avro
Parser", schema);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
index 90fcc1a..c178401 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/AvroDataParserFactory.java
@@ -59,7 +59,7 @@
}
private AvroDataParser createParser(IExternalDataRuntimeContext context) {
- return new AvroDataParser(context);
+ return new AvroDataParser(context, configuration);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 46a1b5b..4cc1656 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -403,6 +403,23 @@
WRITER_SUPPORTED_QUOTES = List.of(DEFAULT_QUOTE, DEFAULT_SINGLE_QUOTE,
NONE);
}
+ public static class AvroOptions {
+ private AvroOptions() {
+ }
+
+ // - DECIMAL_TO_DOUBLE: Convert decimal to double (default: false)
+ // - UUID_AS_STRING: Convert UUID to string (default: true)
+ // - DATE_AS_INT: Convert date to integer (default: true)
+ // - TIME_AS_LONG: Convert time to long (default: true)
+ // - TIMESTAMP_AS_LONG: Convert timestamp to long (default: true)
+ public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+ public static final String UUID_AS_STRING = "uuid-to-string";
+ public static final String DATE_AS_INT = "date-to-int";
+ public static final String TIMEZONE = "timezone";
+ public static final String TIME_AS_LONG = "time-to-long";
+ public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+ }
+
public static class DeltaOptions {
private DeltaOptions() {
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19306
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I6ce5d7f7c3deac986abd416583133475d1f86f27
Gerrit-Change-Number: 19306
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>
Gerrit-MessageType: newchange