>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21314?usp=email )
Change subject: [NO ISSUE][EXT]: fixes for timezone ...................................................................... [NO ISSUE][EXT]: fixes for timezone Details: - allow zone id values such as GMT+01:00 - when iceberg timezone is passed, only updated the UTC adjusted timestamp, the wallclock timestap should remain untouched. - snapshots should not be impacted by provided timezone. - use arithmetics for date claculation to avoid creation of objects unnecessarily. - warn if decimal to double value hit infinity Ext-ref: MB-72261 Change-Id: I2887ab0d9a79bf555a38596fdf941ff88d2e7aab Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21314 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Tested-by: Jenkins <[email protected]> Tested-by: Hussain Towaileb <[email protected]> Integration-Tests: Hussain Towaileb <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.050.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.060.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.000.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.010.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.999.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.050.adm A asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.060.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java 15 files changed, 449 insertions(+), 51 deletions(-) Approvals: Hussain Towaileb: Looks good to me, but someone else must approve; Verified; Verified Jenkins: Verified; Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp index ba010cd..d2cd914 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp @@ -72,4 +72,42 @@ ("catalogName"="myNessieCatalog"), ("format"="parquet"), ("decimal-to-double"="true") +); + +CREATE TYPE allTypesTimesNotConvertedZoneId AS open {}; +CREATE EXTERNAL COLLECTION allTypesTimesNotConvertedZoneId(allTypesTimesNotConvertedZoneId) +USING S3 +( + ("table-format"="iceberg"), + ("namespace"="my_namespace"), + ("tableName"="allTypes"), + ("region"="custom-region"), + ("serviceEndpoint"="%DOCKER_S3_CONTAINER%"), + ("pathStyleAddressing"="true"), + ("catalogName"="myNessieCatalog"), + ("format"="parquet"), + ("decimal-to-double"="true"), + ("date-to-int"="false"), + ("time-to-int"="false"), + ("timestamp-to-long"="false"), + ("timezone"="GMT+01:00") +); + +CREATE TYPE allTypesTimesNotConvertedZoneIdByName AS open {}; +CREATE EXTERNAL COLLECTION allTypesTimesNotConvertedZoneIdByName(allTypesTimesNotConvertedZoneIdByName) +USING S3 +( + ("table-format"="iceberg"), + ("namespace"="my_namespace"), + ("tableName"="allTypes"), + ("region"="custom-region"), + ("serviceEndpoint"="%DOCKER_S3_CONTAINER%"), + ("pathStyleAddressing"="true"), + ("catalogName"="myNessieCatalog"), + ("format"="parquet"), + ("decimal-to-double"="true"), + ("date-to-int"="false"), + ("time-to-int"="false"), + ("timestamp-to-long"="false"), + ("timezone"="Africa/Lagos") ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.050.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.050.query.sqlpp new file mode 100644 index 0000000..fd6e649 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.050.query.sqlpp @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +SELECT + bool_field, + byte_field, + short_field, + int_field, + long_field, + float_field, + double_field, + decimal_field, + string_field, + varchar_field, + char_field, + uuid_field, + print_binary(binary_field, "base64") as binary_field, + print_binary(fixed_field, "base64") as fixed_field, + date_field, + time_field, + timestamp_field, + timestamp_ntz_field, + timestamp_nano_field, + interval_ym_field, + interval_dt_field, + print_binary(geometry_field, "base64") as geometry_field, + print_binary(geography_field, "base64") as geography_field, + struct_field, + list_field, + map_field, + variant_field, + unknown_field +FROM allTypesTimesNotConvertedZoneId; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.060.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.060.query.sqlpp new file mode 100644 index 0000000..3c57332 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.060.query.sqlpp @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +SELECT + bool_field, + byte_field, + short_field, + int_field, + long_field, + float_field, + double_field, + decimal_field, + string_field, + varchar_field, + char_field, + uuid_field, + print_binary(binary_field, "base64") as binary_field, + print_binary(fixed_field, "base64") as fixed_field, + date_field, + time_field, + timestamp_field, + timestamp_ntz_field, + timestamp_nano_field, + interval_ym_field, + interval_dt_field, + print_binary(geometry_field, "base64") as geometry_field, + print_binary(geography_field, "base64") as geography_field, + struct_field, + list_field, + map_field, + variant_field, + unknown_field +FROM allTypesTimesNotConvertedZoneIdByName; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp index 6fa5c73..d0468b9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp @@ -20,7 +20,11 @@ DROP COLLECTION allTypesTimesConverted; DROP COLLECTION allTypesTimesNotConverted; DROP COLLECTION allTypesTimesDefaultExpectingNotConverted; +DROP COLLECTION allTypesTimesNotConvertedZoneId; +DROP COLLECTION allTypesTimesNotConvertedZoneIdByName; DROP TYPE allTypesTimesConverted; DROP TYPE allTypesTimesNotConverted; DROP TYPE allTypesTimesDefaultExpectingNotConverted; +DROP TYPE allTypesTimesNotConvertedZoneId; +DROP TYPE allTypesTimesNotConvertedZoneIdByName; DROP CATALOG myNessieCatalog; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.000.ddl.sqlpp new file mode 100644 index 0000000..e4a3976 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.000.ddl.sqlpp @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +CREATE CATALOG myNessieCatalog +TYPE Iceberg +SOURCE NESSIE +WITH { + "uri": "%NESSIE_SERVER_URI%", + "warehouse": "s3://iceberg-container/nessie/warehouse" +}; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.010.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.010.ddl.sqlpp new file mode 100644 index 0000000..30fbbac --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.010.ddl.sqlpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP COLLECTION users IF EXISTS; +DROP TYPE users IF EXISTS; + +CREATE TYPE users AS open {}; +CREATE EXTERNAL COLLECTION badSnapshot(users) +USING S3 +( + ("table-format"="iceberg"), + ("namespace"="my_namespace"), + ("tableName"="users"), + ("region"="custom-region"), + ("serviceEndpoint"="%DOCKER_S3_CONTAINER%"), + ("pathStyleAddressing"="true"), + ("catalogName"="myNessieCatalog"), + ("format"="parquet"), + ("decimal-to-double"="true"), + ("date-to-int"="true"), + ("time-to-int"="true"), + ("timestamp-to-long"="true"), + ("timezone"="GMT+Zft!") +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.999.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.999.ddl.sqlpp new file mode 100644 index 0000000..906b1e1 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.999.ddl.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP CATALOG myNessieCatalog; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.050.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.050.adm new file mode 100644 index 0000000..a6c1db5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.050.adm @@ -0,0 +1 @@ +{ "binary_field": "AQIDBAUGBwg=", "fixed_field": "SGVsbG8gV29ybGQ=", "geometry_field": "AQIDBAUGBwgJCg==", "geography_field": "AQIDBAUGBwgJCg0=", "bool_field": true, "byte_field": 42, "short_field": 1000, "int_field": 42, "long_field": 9223372036854775807, "float_field": 3.14, "double_field": 2.718281828459045, "decimal_field": 12345.6789, "string_field": "hello world", "varchar_field": "varchar value one", "char_field": "Hi", "uuid_field": uuid("550e8400-e29b-41d4-a716-446655440000"), "date_field": date("2024-01-01"), "time_field": time("10:20:30.000"), "timestamp_field": datetime("2024-02-03T23:40:00.000"), "timestamp_ntz_field": datetime("2024-02-04T12:00:00.000"), "timestamp_nano_field": datetime("2024-02-03T23:40:00.000"), "interval_ym_field": 14, "interval_dt_field": 37230000000, "struct_field": { "name": "Alice", "age": 30, "active": true }, "list_field": [ "a", "b", "c" ], "map_field": { "key1": "value1", "key2": "100" }, "variant_field": "string value", "unknown_field": null } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.060.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.060.adm new file mode 100644 index 0000000..a6c1db5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.060.adm @@ -0,0 +1 @@ +{ "binary_field": "AQIDBAUGBwg=", "fixed_field": "SGVsbG8gV29ybGQ=", "geometry_field": "AQIDBAUGBwgJCg==", "geography_field": "AQIDBAUGBwgJCg0=", "bool_field": true, "byte_field": 42, "short_field": 1000, "int_field": 42, "long_field": 9223372036854775807, "float_field": 3.14, "double_field": 2.718281828459045, "decimal_field": 12345.6789, "string_field": "hello world", "varchar_field": "varchar value one", "char_field": "Hi", "uuid_field": uuid("550e8400-e29b-41d4-a716-446655440000"), "date_field": date("2024-01-01"), "time_field": time("10:20:30.000"), "timestamp_field": datetime("2024-02-03T23:40:00.000"), "timestamp_ntz_field": datetime("2024-02-04T12:00:00.000"), "timestamp_nano_field": datetime("2024-02-03T23:40:00.000"), "interval_ym_field": 14, "interval_dt_field": 37230000000, "struct_field": { "name": "Alice", "age": 30, "active": true }, "list_field": [ "a", "b", "c" ], "map_field": { "key1": "value1", "key2": "100" }, "variant_field": "string value", "unknown_field": null } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml index 9797ccb..69564bd 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml @@ -97,5 +97,11 @@ <expected-error>ASX1108: External source error. unexpected TIMESTAMP snapshot format. Allow formats are: (milliseconds, yyyy-MM-dd or yyyy-MM-dd'T'HH:mm:ss). Found: 2024-12-12 11:11:11.123</expected-error> </compilation-unit> </test-case> + <test-case FilePath="iceberg/negative"> + <compilation-unit name="invalid-timezone"> + <output-dir compare="Text">invalid-timezone</output-dir> + <expected-error>ASX1172: Provided timezone is invalid: 'GMT+Zft!'</expected-error> + </compilation-unit> + </test-case> </test-group> </test-suite> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java index c7713ac..7208851 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java @@ -25,14 +25,11 @@ import static org.apache.asterix.external.util.ExternalDataConstants.IcebergOptions.TIME_AS_INT; import java.time.ZoneId; -import java.time.ZoneOffset; -import java.util.List; import java.util.Map; import java.util.TimeZone; import org.apache.asterix.external.parser.jackson.ParserContext; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.hyracks.api.exceptions.Warning; public class IcebergConverterContext extends ParserContext { @@ -41,10 +38,8 @@ private final boolean timeAsInt; private final boolean timestampAsLong; private final ZoneId timeZoneId; - private final List<Warning> warnings; - public IcebergConverterContext(Map<String, String> configuration, List<Warning> warnings) { - this.warnings = warnings; + public IcebergConverterContext(Map<String, String> configuration) { decimalToDouble = Boolean.parseBoolean(configuration.getOrDefault(DECIMAL_TO_DOUBLE, FALSE)); dateAsInt = Boolean.parseBoolean(configuration.getOrDefault(DATE_AS_INT, FALSE)); timeAsInt = Boolean.parseBoolean(configuration.getOrDefault(TIME_AS_INT, FALSE)); @@ -54,7 +49,7 @@ if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) { timeZoneId = TimeZone.getTimeZone(configuredTimeZoneId).toZoneId(); } else { - timeZoneId = ZoneOffset.UTC; + timeZoneId = null; } } @@ -77,8 +72,4 @@ public boolean isDateAsInt() { return dateAsInt; } - - public List<Warning> getWarnings() { - return warnings; - } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java index 79840f2..aaf37d0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java @@ -31,8 +31,9 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneId; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; +import java.time.ZoneOffset; +import java.time.zone.ZoneOffsetTransition; +import java.time.zone.ZoneRules; import java.util.List; import java.util.Map; import java.util.UUID; @@ -54,6 +55,7 @@ import org.apache.asterix.om.types.ATypeTag; import org.apache.avro.AvroRuntimeException; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.data.std.api.IMutableValueStorage; import org.apache.hyracks.data.std.api.IValueReference; @@ -69,13 +71,17 @@ private final IcebergConverterContext parserContext; private final IExternalFilterValueEmbedder valueEmbedder; private final Schema projectedSchema; + private final TimestampZoneProjector timestampZoneProjector; + private final IWarningCollector warningCollector; + private boolean decimalOverflowWarned = false; public IcebergParquetDataParser(IExternalDataRuntimeContext context, Map<String, String> conf, Schema projectedSchema) { - List<Warning> warnings = new ArrayList<>(); - parserContext = new IcebergConverterContext(conf, warnings); + parserContext = new IcebergConverterContext(conf); valueEmbedder = context.getValueEmbedder(); this.projectedSchema = projectedSchema; + timestampZoneProjector = new TimestampZoneProjector(parserContext.getTimeZoneId()); + warningCollector = context.getTaskContext().getWarningCollector(); } @Override @@ -306,7 +312,13 @@ } private void serializeDecimal(BigDecimal value, DataOutput out) throws HyracksDataException { - serializeDouble(value.doubleValue(), out); + double doubleValue = value.doubleValue(); + if (warningCollector.shouldWarn() && !Double.isFinite(doubleValue) && !decimalOverflowWarned) { + warningCollector.warn(Warning.of(null, ErrorCode.EXTERNAL_SOURCE_ERROR, + "decimal value overflows double representation; infinity will be stored")); + decimalOverflowWarned = true; + } + serializeDouble(doubleValue, out); } private void serializeBinary(Object value, DataOutput out) throws HyracksDataException { @@ -350,29 +362,81 @@ } private void serializeTimestamp(Type type, Object value, DataOutput output) throws HyracksDataException { - Instant instant; switch (value) { - case OffsetDateTime offsetDateTime -> instant = offsetDateTime.toInstant(); - case LocalDateTime localDateTime -> { - ZoneId zoneId = parserContext.getTimeZoneId(); - instant = localDateTime.atZone(zoneId).toInstant(); - } + case LocalDateTime localDateTime -> serializeWallClockTimestamp(type, localDateTime, output); + case OffsetDateTime offsetDateTime -> serializeUtcAdjustedTimestamp(type, offsetDateTime, output); case null, default -> throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, value == null ? "unexpected null value for field type (" + type + ")" : "unexpected value type (" + value.getClass() + ") for field type (" + type + ")"); } + } + private void serializeWallClockTimestamp(Type type, LocalDateTime localDateTime, DataOutput output) + throws HyracksDataException { + long epochSecond = localDateTime.toEpochSecond(ZoneOffset.UTC); + int nano = localDateTime.getNano(); if (parserContext.isTimestampAsLong()) { - long timestampAsLong = type.typeId() == Type.TypeID.TIMESTAMP_NANO - ? ChronoUnit.NANOS.between(Instant.EPOCH, instant) - : ChronoUnit.MICROS.between(Instant.EPOCH, instant); - serializeLong(timestampAsLong, output); + long value = isTimestampNano(type) ? toNanos(epochSecond, nano) : toMicros(epochSecond, nano); + serializeLong(value, output); } else { - aDateTime.setValue(instant.toEpochMilli()); + aDateTime.setValue(toMillis(epochSecond, nano)); datetimeSerde.serialize(aDateTime, output); } } + private void serializeUtcAdjustedTimestamp(Type type, OffsetDateTime offsetDateTime, DataOutput output) + throws HyracksDataException { + long epochSecond = offsetDateTime.toEpochSecond(); + int nano = offsetDateTime.getNano(); + if (parserContext.isTimestampAsLong()) { + TimestampUnit unit = getTimestampUnit(type); + long value = unit == TimestampUnit.NANOS ? toNanos(epochSecond, nano) : toMicros(epochSecond, nano); + value = timestampZoneProjector.projectEpochValue(value, unit); + serializeLong(value, output); + } else { + long epochMillis = toMillis(epochSecond, nano); + epochMillis = timestampZoneProjector.projectEpochMillis(epochMillis); + aDateTime.setValue(epochMillis); + datetimeSerde.serialize(aDateTime, output); + } + } + + private static TimestampUnit getTimestampUnit(Type type) { + return isTimestampNano(type) ? TimestampUnit.NANOS : TimestampUnit.MICROS; + } + + private static boolean isTimestampNano(Type type) { + return type.typeId() == Type.TypeID.TIMESTAMP_NANO; + } + + private static long toMillis(long epochSecond, int nano) throws HyracksDataException { + try { + return Math.addExact(Math.multiplyExact(epochSecond, 1_000L), nano / 1_000_000L); + } catch (ArithmeticException ex) { + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, + "timestamp value overflows long representation in milliseconds"); + } + } + + private static long toMicros(long epochSecond, int nano) throws HyracksDataException { + try { + return Math.addExact(Math.multiplyExact(epochSecond, 1_000_000L), nano / 1_000L); + } catch (ArithmeticException ex) { + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, + "timestamp value overflows long representation in microseconds"); + } + } + + // Epoch nanoseconds stored in a long overflow outside roughly 1677-09-21 to 2262-04-11 + private static long toNanos(long epochSecond, int nano) throws HyracksDataException { + try { + return Math.addExact(Math.multiplyExact(epochSecond, 1_000_000_000L), nano); + } catch (ArithmeticException ex) { + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, + "timestamp value overflows long representation in nanoseconds"); + } + } + private static HyracksDataException createUnsupportedException(Type type) { return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Iceberg Parser", type.toString()); } @@ -429,4 +493,101 @@ default -> throw createUnsupportedException(type); }; } + + private enum TimestampUnit { + MILLIS, + MICROS, + NANOS + } + + // Not thread-safe by design: one instance per parser, parsers are per-task/per-thread. + private static final class TimestampZoneProjector { + private static final long MILLIS_PER_SECOND = 1_000L; + private static final long MICROS_PER_SECOND = 1_000_000L; + private static final long NANOS_PER_SECOND = 1_000_000_000L; + + private final boolean enabled; + private final ZoneRules zoneRules; + private final boolean fixedOffsetZone; + private final int fixedOffsetSeconds; + + private long validFromEpochSecond = Long.MIN_VALUE; + private long validUntilEpochSecond = Long.MIN_VALUE; + private int cachedOffsetSeconds; + + private TimestampZoneProjector(ZoneId zoneId) { + enabled = zoneId != null; + + if (enabled) { + zoneRules = zoneId.getRules(); + fixedOffsetZone = zoneRules.isFixedOffset(); + fixedOffsetSeconds = fixedOffsetZone ? zoneRules.getOffset(Instant.EPOCH).getTotalSeconds() : 0; + } else { + zoneRules = null; + fixedOffsetZone = true; + fixedOffsetSeconds = 0; + } + } + + private long projectEpochMillis(long epochMillis) throws HyracksDataException { + return projectEpochValue(epochMillis, TimestampUnit.MILLIS); + } + + private long projectEpochValue(long epochValue, TimestampUnit unit) throws HyracksDataException { + if (!enabled) { + return epochValue; + } + + int offsetSeconds = getOffsetSeconds(epochValue, unit); + + try { + return switch (unit) { + case MILLIS -> Math.addExact(epochValue, offsetSeconds * MILLIS_PER_SECOND); + case MICROS -> Math.addExact(epochValue, offsetSeconds * MICROS_PER_SECOND); + case NANOS -> Math.addExact(epochValue, offsetSeconds * NANOS_PER_SECOND); + }; + } catch (ArithmeticException ex) { + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, + "timestamp value overflows long representation after applying timezone configuration"); + } + } + + private int getOffsetSeconds(long epochValue, TimestampUnit unit) { + if (fixedOffsetZone) { + return fixedOffsetSeconds; + } + + long epochSecond = toEpochSecond(epochValue, unit); + + if (epochSecond >= validFromEpochSecond && epochSecond < validUntilEpochSecond) { + return cachedOffsetSeconds; + } + + return refreshOffsetCache(epochSecond); + } + + private int refreshOffsetCache(long epochSecond) { + Instant instant = Instant.ofEpochSecond(epochSecond); + + ZoneOffset offset = zoneRules.getOffset(instant); + // Use epochSecond + 1ns so that if the record falls exactly on a transition boundary, + // previousTransition captures that transition as the start of the current offset period. + ZoneOffsetTransition previous = zoneRules.previousTransition(Instant.ofEpochSecond(epochSecond, 1L)); + ZoneOffsetTransition next = zoneRules.nextTransition(instant); + + validFromEpochSecond = previous == null ? Long.MIN_VALUE : previous.getInstant().getEpochSecond(); + validUntilEpochSecond = next == null ? Long.MAX_VALUE : next.getInstant().getEpochSecond(); + cachedOffsetSeconds = offset.getTotalSeconds(); + + return cachedOffsetSeconds; + } + + private static long toEpochSecond(long epochValue, TimestampUnit unit) { + return switch (unit) { + case MILLIS -> Math.floorDiv(epochValue, MILLIS_PER_SECOND); + case MICROS -> Math.floorDiv(epochValue, MICROS_PER_SECOND); + case NANOS -> Math.floorDiv(epochValue, NANOS_PER_SECOND); + }; + } + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 7eb1b9c..713a1fb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.external.util; +import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR; import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE; import static org.apache.asterix.common.exceptions.ErrorCode.PARSE_ERROR; import static org.apache.asterix.common.exceptions.ErrorCode.PROPERTY_INVALID_VALUE_TYPE; @@ -52,6 +53,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -60,9 +62,9 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.TimeZone; import java.util.function.BiPredicate; import java.util.regex.Matcher; @@ -144,7 +146,7 @@ public class ExternalDataUtils { - private static final Set<String> validTimeZones = Set.of(TimeZone.getAvailableIDs()); + private static final Map<String, String> validTimeZonesMap = new HashMap<>(); private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class); private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024; private static final int HEADER_FUDGE = 64; @@ -158,6 +160,9 @@ valueParserFactoryMap.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE); valueParserFactoryMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE); valueParserFactoryMap.put(ATypeTag.BOOLEAN, BooleanParserFactory.INSTANCE); + for (String id : TimeZone.getAvailableIDs()) { + validTimeZonesMap.put(id.toLowerCase(Locale.ROOT), id); + } } private ExternalDataUtils() { @@ -539,10 +544,9 @@ throw new CompilationException(ErrorCode.INVALID_DELTA_TABLE_FORMAT, configuration.get(ExternalDataConstants.KEY_FORMAT)); } - if (configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE) - && !validTimeZones.contains(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE))) { - throw new CompilationException(ErrorCode.INVALID_TIMEZONE, - configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE)); + if (configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE)) { + String resolved = resolveTimeZone(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE)); + configuration.put(ExternalDataConstants.DeltaOptions.TIMEZONE, resolved); } } @@ -1011,21 +1015,26 @@ return isParquetFormat(properties) || isDeltaTable(properties) || isIcebergTable(properties); } - /** - * Validate Parquet dataset's declared type and configuration - * - * @param properties external dataset configuration - * @param datasetRecordType dataset declared type - */ + public static String resolveTimeZone(String timeZoneId) throws CompilationException { + String canonical = validTimeZonesMap.get(timeZoneId.toLowerCase(Locale.ROOT)); + if (canonical != null) { + return canonical; + } + try { + return ZoneId.of(timeZoneId).getId(); + } catch (Exception e) { + throw CompilationException.create(ErrorCode.INVALID_TIMEZONE, e, timeZoneId); + } + } + public static void validateParquetTypeAndConfiguration(Map<String, String> properties, ARecordType datasetRecordType) throws CompilationException { if (isParquetFormat(properties)) { if (datasetRecordType.getFieldTypes().length != 0) { throw new CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, datasetRecordType.getTypeName()); - } else if (properties.containsKey(ParquetOptions.TIMEZONE) - && !validTimeZones.contains(properties.get(ParquetOptions.TIMEZONE))) { - //Ensure the configured time zone id is correct - throw new CompilationException(ErrorCode.INVALID_TIMEZONE, properties.get(ParquetOptions.TIMEZONE)); + } else if (properties.containsKey(ParquetOptions.TIMEZONE)) { + String resolved = resolveTimeZone(properties.get(ParquetOptions.TIMEZONE)); + properties.put(ParquetOptions.TIMEZONE, resolved); } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java index ab2ffa9..b2a2cdf 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java @@ -28,7 +28,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; -import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeParseException; import java.util.Map; @@ -36,7 +35,6 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -49,8 +47,6 @@ public static Optional<Long> validateAndGetSnapshot(Map<String, String> properties) throws CompilationException { String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); String snapshotTimestamp = properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY); - String timeZoneId = properties.get(ExternalDataConstants.IcebergOptions.TIMEZONE); - ZoneId zoneId = timeZoneId != null ? ZoneId.of(timeZoneId) : ZoneOffset.UTC; if (snapshotId != null && snapshotTimestamp != null) { throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY, ICEBERG_SNAPSHOT_ID_PROPERTY_KEY); @@ -60,7 +56,7 @@ if (snapshotId != null) { return Optional.of(Long.parseLong(snapshotId)); } else if (snapshotTimestamp != null) { - return Optional.of(parseTimestamp(snapshotTimestamp, zoneId)); + return Optional.of(parseTimestamp(snapshotTimestamp)); } else { return Optional.empty(); } @@ -70,7 +66,7 @@ } } - private static long parseTimestamp(String timestamp, ZoneId zoneId) throws CompilationException { + private static long parseTimestamp(String timestamp) throws CompilationException { try { // try parsing as a long first return Long.parseLong(timestamp); @@ -79,14 +75,14 @@ try { // try parsing as ISO 8601 date, e.g., "yyyy-MM-dd" - return LocalDate.parse(timestamp, ISO_LOCAL_DATE).atStartOfDay(zoneId).toInstant().toEpochMilli(); + return LocalDate.parse(timestamp, ISO_LOCAL_DATE).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); } catch (DateTimeParseException ignored) { } try { // try parsing as ISO 8601 timestamp, e.g., "yyyy-MM-dd'T'HH:mm:ss" LocalDateTime localDateTime = LocalDateTime.parse(timestamp, ISO_LOCAL_DATE_TIME); - return localDateTime.atZone(zoneId).toInstant().toEpochMilli(); + return localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli(); } catch (DateTimeParseException ignored) { throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "unexpected TIMESTAMP snapshot format. Allow formats are: (milliseconds, yyyy-MM-dd or " diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java index 6366275..9bd3e67 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java @@ -167,6 +167,13 @@ IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY); } + // if timezone is provided, validate it + String timezone = properties.get(ExternalDataConstants.ParquetOptions.TIMEZONE); + if (timezone != null && !timezone.isEmpty()) { + ExternalDataUtils.resolveTimeZone(timezone); + } + + // validate snapshot IcebergSnapshotUtils.validateAndGetSnapshot(properties); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21314?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: lumina Gerrit-Change-Id: I2887ab0d9a79bf555a38596fdf941ff88d2e7aab Gerrit-Change-Number: 21314 Gerrit-PatchSet: 6 Gerrit-Owner: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-CC: Anon. E. Moose #1000171
