>From Hussain Towaileb <[email protected]>:

Hussain Towaileb has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21314?usp=email )

Change subject: [NO ISSUE][EXT]: fixes for timezone
......................................................................

[NO ISSUE][EXT]: fixes for timezone

Details:
- allow zone id values such as GMT+01:00
- when iceberg timezone is passed, only updated the UTC adjusted
  timestamp, the wallclock timestap should remain untouched.
- snapshots should not be impacted by provided timezone.
- use arithmetics for date claculation to avoid creation of objects
  unnecessarily.
- warn if decimal to double value hit infinity

Ext-ref: MB-72261
Change-Id: I2887ab0d9a79bf555a38596fdf941ff88d2e7aab
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21314
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
Tested-by: Jenkins <[email protected]>
Tested-by: Hussain Towaileb <[email protected]>
Integration-Tests: Hussain Towaileb <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.050.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.060.query.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.000.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.010.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.999.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.050.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.060.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
15 files changed, 449 insertions(+), 51 deletions(-)

Approvals:
  Hussain Towaileb: Looks good to me, but someone else must approve; Verified; 
Verified
  Jenkins: Verified; Verified
  Murtadha Hubail: Looks good to me, approved




diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp
index ba010cd..d2cd914 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.010.ddl.sqlpp
@@ -72,4 +72,42 @@
     ("catalogName"="myNessieCatalog"),
     ("format"="parquet"),
     ("decimal-to-double"="true")
+);
+
+CREATE TYPE allTypesTimesNotConvertedZoneId AS open {};
+CREATE EXTERNAL COLLECTION 
allTypesTimesNotConvertedZoneId(allTypesTimesNotConvertedZoneId)
+USING S3
+(
+    ("table-format"="iceberg"),
+    ("namespace"="my_namespace"),
+    ("tableName"="allTypes"),
+    ("region"="custom-region"),
+    ("serviceEndpoint"="%DOCKER_S3_CONTAINER%"),
+    ("pathStyleAddressing"="true"),
+    ("catalogName"="myNessieCatalog"),
+    ("format"="parquet"),
+    ("decimal-to-double"="true"),
+    ("date-to-int"="false"),
+    ("time-to-int"="false"),
+    ("timestamp-to-long"="false"),
+    ("timezone"="GMT+01:00")
+);
+
+CREATE TYPE allTypesTimesNotConvertedZoneIdByName AS open {};
+CREATE EXTERNAL COLLECTION 
allTypesTimesNotConvertedZoneIdByName(allTypesTimesNotConvertedZoneIdByName)
+USING S3
+(
+    ("table-format"="iceberg"),
+    ("namespace"="my_namespace"),
+    ("tableName"="allTypes"),
+    ("region"="custom-region"),
+    ("serviceEndpoint"="%DOCKER_S3_CONTAINER%"),
+    ("pathStyleAddressing"="true"),
+    ("catalogName"="myNessieCatalog"),
+    ("format"="parquet"),
+    ("decimal-to-double"="true"),
+    ("date-to-int"="false"),
+    ("time-to-int"="false"),
+    ("timestamp-to-long"="false"),
+    ("timezone"="Africa/Lagos")
 );
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.050.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.050.query.sqlpp
new file mode 100644
index 0000000..fd6e649
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.050.query.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+SELECT
+    bool_field,
+    byte_field,
+    short_field,
+    int_field,
+    long_field,
+    float_field,
+    double_field,
+    decimal_field,
+    string_field,
+    varchar_field,
+    char_field,
+    uuid_field,
+    print_binary(binary_field, "base64") as binary_field,
+    print_binary(fixed_field, "base64") as fixed_field,
+    date_field,
+    time_field,
+    timestamp_field,
+    timestamp_ntz_field,
+    timestamp_nano_field,
+    interval_ym_field,
+    interval_dt_field,
+    print_binary(geometry_field, "base64") as geometry_field,
+    print_binary(geography_field, "base64") as geography_field,
+    struct_field,
+    list_field,
+    map_field,
+    variant_field,
+    unknown_field
+FROM allTypesTimesNotConvertedZoneId;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.060.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.060.query.sqlpp
new file mode 100644
index 0000000..3c57332
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.060.query.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+SELECT
+    bool_field,
+    byte_field,
+    short_field,
+    int_field,
+    long_field,
+    float_field,
+    double_field,
+    decimal_field,
+    string_field,
+    varchar_field,
+    char_field,
+    uuid_field,
+    print_binary(binary_field, "base64") as binary_field,
+    print_binary(fixed_field, "base64") as fixed_field,
+    date_field,
+    time_field,
+    timestamp_field,
+    timestamp_ntz_field,
+    timestamp_nano_field,
+    interval_ym_field,
+    interval_dt_field,
+    print_binary(geometry_field, "base64") as geometry_field,
+    print_binary(geography_field, "base64") as geography_field,
+    struct_field,
+    list_field,
+    map_field,
+    variant_field,
+    unknown_field
+FROM allTypesTimesNotConvertedZoneIdByName;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp
index 6fa5c73..d0468b9 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/all-data-types/test.999.ddl.sqlpp
@@ -20,7 +20,11 @@
 DROP COLLECTION allTypesTimesConverted;
 DROP COLLECTION allTypesTimesNotConverted;
 DROP COLLECTION allTypesTimesDefaultExpectingNotConverted;
+DROP COLLECTION allTypesTimesNotConvertedZoneId;
+DROP COLLECTION allTypesTimesNotConvertedZoneIdByName;
 DROP TYPE allTypesTimesConverted;
 DROP TYPE allTypesTimesNotConverted;
 DROP TYPE allTypesTimesDefaultExpectingNotConverted;
+DROP TYPE allTypesTimesNotConvertedZoneId;
+DROP TYPE allTypesTimesNotConvertedZoneIdByName;
 DROP CATALOG myNessieCatalog;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.000.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.000.ddl.sqlpp
new file mode 100644
index 0000000..e4a3976
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.000.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE CATALOG myNessieCatalog
+TYPE Iceberg
+SOURCE NESSIE
+WITH {
+    "uri": "%NESSIE_SERVER_URI%",
+    "warehouse": "s3://iceberg-container/nessie/warehouse"
+};
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.010.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.010.ddl.sqlpp
new file mode 100644
index 0000000..30fbbac
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.010.ddl.sqlpp
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP COLLECTION users IF EXISTS;
+DROP TYPE users IF EXISTS;
+
+CREATE TYPE users AS open {};
+CREATE EXTERNAL COLLECTION badSnapshot(users)
+USING S3
+(
+    ("table-format"="iceberg"),
+    ("namespace"="my_namespace"),
+    ("tableName"="users"),
+    ("region"="custom-region"),
+    ("serviceEndpoint"="%DOCKER_S3_CONTAINER%"),
+    ("pathStyleAddressing"="true"),
+    ("catalogName"="myNessieCatalog"),
+    ("format"="parquet"),
+    ("decimal-to-double"="true"),
+    ("date-to-int"="true"),
+    ("time-to-int"="true"),
+    ("timestamp-to-long"="true"),
+    ("timezone"="GMT+Zft!")
+);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.999.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.999.ddl.sqlpp
new file mode 100644
index 0000000..906b1e1
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/negative/invalid-timezone/test.999.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP CATALOG myNessieCatalog;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.050.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.050.adm
new file mode 100644
index 0000000..a6c1db5
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.050.adm
@@ -0,0 +1 @@
+{ "binary_field": "AQIDBAUGBwg=", "fixed_field": "SGVsbG8gV29ybGQ=", 
"geometry_field": "AQIDBAUGBwgJCg==", "geography_field": "AQIDBAUGBwgJCg0=", 
"bool_field": true, "byte_field": 42, "short_field": 1000, "int_field": 42, 
"long_field": 9223372036854775807, "float_field": 3.14, "double_field": 
2.718281828459045, "decimal_field": 12345.6789, "string_field": "hello world", 
"varchar_field": "varchar value one", "char_field": "Hi", "uuid_field": 
uuid("550e8400-e29b-41d4-a716-446655440000"), "date_field": date("2024-01-01"), 
"time_field": time("10:20:30.000"), "timestamp_field": 
datetime("2024-02-03T23:40:00.000"), "timestamp_ntz_field": 
datetime("2024-02-04T12:00:00.000"), "timestamp_nano_field": 
datetime("2024-02-03T23:40:00.000"), "interval_ym_field": 14, 
"interval_dt_field": 37230000000, "struct_field": { "name": "Alice", "age": 30, 
"active": true }, "list_field": [ "a", "b", "c" ], "map_field": { "key1": 
"value1", "key2": "100" }, "variant_field": "string value", "unknown_field": 
null }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.060.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.060.adm
new file mode 100644
index 0000000..a6c1db5
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/iceberg/all-data-types/result.060.adm
@@ -0,0 +1 @@
+{ "binary_field": "AQIDBAUGBwg=", "fixed_field": "SGVsbG8gV29ybGQ=", 
"geometry_field": "AQIDBAUGBwgJCg==", "geography_field": "AQIDBAUGBwgJCg0=", 
"bool_field": true, "byte_field": 42, "short_field": 1000, "int_field": 42, 
"long_field": 9223372036854775807, "float_field": 3.14, "double_field": 
2.718281828459045, "decimal_field": 12345.6789, "string_field": "hello world", 
"varchar_field": "varchar value one", "char_field": "Hi", "uuid_field": 
uuid("550e8400-e29b-41d4-a716-446655440000"), "date_field": date("2024-01-01"), 
"time_field": time("10:20:30.000"), "timestamp_field": 
datetime("2024-02-03T23:40:00.000"), "timestamp_ntz_field": 
datetime("2024-02-04T12:00:00.000"), "timestamp_nano_field": 
datetime("2024-02-03T23:40:00.000"), "interval_ym_field": 14, 
"interval_dt_field": 37230000000, "struct_field": { "name": "Alice", "age": 30, 
"active": true }, "list_field": [ "a", "b", "c" ], "map_field": { "key1": 
"value1", "key2": "100" }, "variant_field": "string value", "unknown_field": 
null }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml
index 9797ccb..69564bd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_iceberg.xml
@@ -97,5 +97,11 @@
         <expected-error>ASX1108: External source error. unexpected TIMESTAMP 
snapshot format. Allow formats are: (milliseconds, yyyy-MM-dd or 
yyyy-MM-dd'T'HH:mm:ss). Found: 2024-12-12 11:11:11.123</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="iceberg/negative">
+      <compilation-unit name="invalid-timezone">
+        <output-dir compare="Text">invalid-timezone</output-dir>
+        <expected-error>ASX1172: Provided timezone is invalid: 
'GMT+Zft!'</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
index c7713ac..7208851 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
@@ -25,14 +25,11 @@
 import static 
org.apache.asterix.external.util.ExternalDataConstants.IcebergOptions.TIME_AS_INT;

 import java.time.ZoneId;
-import java.time.ZoneOffset;
-import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;

 import org.apache.asterix.external.parser.jackson.ParserContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.hyracks.api.exceptions.Warning;
 
 public class IcebergConverterContext extends ParserContext {

@@ -41,10 +38,8 @@
     private final boolean timeAsInt;
     private final boolean timestampAsLong;
     private final ZoneId timeZoneId;
-    private final List<Warning> warnings;

-    public IcebergConverterContext(Map<String, String> configuration, 
List<Warning> warnings) {
-        this.warnings = warnings;
+    public IcebergConverterContext(Map<String, String> configuration) {
         decimalToDouble = 
Boolean.parseBoolean(configuration.getOrDefault(DECIMAL_TO_DOUBLE, FALSE));
         dateAsInt = 
Boolean.parseBoolean(configuration.getOrDefault(DATE_AS_INT, FALSE));
         timeAsInt = 
Boolean.parseBoolean(configuration.getOrDefault(TIME_AS_INT, FALSE));
@@ -54,7 +49,7 @@
         if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
             timeZoneId = TimeZone.getTimeZone(configuredTimeZoneId).toZoneId();
         } else {
-            timeZoneId = ZoneOffset.UTC;
+            timeZoneId = null;
         }
     }

@@ -77,8 +72,4 @@
     public boolean isDateAsInt() {
         return dateAsInt;
     }
-
-    public List<Warning> getWarnings() {
-        return warnings;
-    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
index 79840f2..aaf37d0 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -31,8 +31,9 @@
 import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneId;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
+import java.time.ZoneOffset;
+import java.time.zone.ZoneOffsetTransition;
+import java.time.zone.ZoneRules;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -54,6 +55,7 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
 import org.apache.hyracks.data.std.api.IValueReference;
@@ -69,13 +71,17 @@
     private final IcebergConverterContext parserContext;
     private final IExternalFilterValueEmbedder valueEmbedder;
     private final Schema projectedSchema;
+    private final TimestampZoneProjector timestampZoneProjector;
+    private final IWarningCollector warningCollector;
+    private boolean decimalOverflowWarned = false;

     public IcebergParquetDataParser(IExternalDataRuntimeContext context, 
Map<String, String> conf,
             Schema projectedSchema) {
-        List<Warning> warnings = new ArrayList<>();
-        parserContext = new IcebergConverterContext(conf, warnings);
+        parserContext = new IcebergConverterContext(conf);
         valueEmbedder = context.getValueEmbedder();
         this.projectedSchema = projectedSchema;
+        timestampZoneProjector = new 
TimestampZoneProjector(parserContext.getTimeZoneId());
+        warningCollector = context.getTaskContext().getWarningCollector();
     }

     @Override
@@ -306,7 +312,13 @@
     }

     private void serializeDecimal(BigDecimal value, DataOutput out) throws 
HyracksDataException {
-        serializeDouble(value.doubleValue(), out);
+        double doubleValue = value.doubleValue();
+        if (warningCollector.shouldWarn() && !Double.isFinite(doubleValue) && 
!decimalOverflowWarned) {
+            warningCollector.warn(Warning.of(null, 
ErrorCode.EXTERNAL_SOURCE_ERROR,
+                    "decimal value overflows double representation; infinity 
will be stored"));
+            decimalOverflowWarned = true;
+        }
+        serializeDouble(doubleValue, out);
     }

     private void serializeBinary(Object value, DataOutput out) throws 
HyracksDataException {
@@ -350,29 +362,81 @@
     }

     private void serializeTimestamp(Type type, Object value, DataOutput 
output) throws HyracksDataException {
-        Instant instant;
         switch (value) {
-            case OffsetDateTime offsetDateTime -> instant = 
offsetDateTime.toInstant();
-            case LocalDateTime localDateTime -> {
-                ZoneId zoneId = parserContext.getTimeZoneId();
-                instant = localDateTime.atZone(zoneId).toInstant();
-            }
+            case LocalDateTime localDateTime -> 
serializeWallClockTimestamp(type, localDateTime, output);
+            case OffsetDateTime offsetDateTime -> 
serializeUtcAdjustedTimestamp(type, offsetDateTime, output);
             case null, default -> throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
                     value == null ? "unexpected null value for field type (" + 
type + ")"
                             : "unexpected value type (" + value.getClass() + 
") for field type (" + type + ")");
         }
+    }

+    private void serializeWallClockTimestamp(Type type, LocalDateTime 
localDateTime, DataOutput output)
+            throws HyracksDataException {
+        long epochSecond = localDateTime.toEpochSecond(ZoneOffset.UTC);
+        int nano = localDateTime.getNano();
         if (parserContext.isTimestampAsLong()) {
-            long timestampAsLong = type.typeId() == Type.TypeID.TIMESTAMP_NANO
-                    ? ChronoUnit.NANOS.between(Instant.EPOCH, instant)
-                    : ChronoUnit.MICROS.between(Instant.EPOCH, instant);
-            serializeLong(timestampAsLong, output);
+            long value = isTimestampNano(type) ? toNanos(epochSecond, nano) : 
toMicros(epochSecond, nano);
+            serializeLong(value, output);
         } else {
-            aDateTime.setValue(instant.toEpochMilli());
+            aDateTime.setValue(toMillis(epochSecond, nano));
             datetimeSerde.serialize(aDateTime, output);
         }
     }

+    private void serializeUtcAdjustedTimestamp(Type type, OffsetDateTime 
offsetDateTime, DataOutput output)
+            throws HyracksDataException {
+        long epochSecond = offsetDateTime.toEpochSecond();
+        int nano = offsetDateTime.getNano();
+        if (parserContext.isTimestampAsLong()) {
+            TimestampUnit unit = getTimestampUnit(type);
+            long value = unit == TimestampUnit.NANOS ? toNanos(epochSecond, 
nano) : toMicros(epochSecond, nano);
+            value = timestampZoneProjector.projectEpochValue(value, unit);
+            serializeLong(value, output);
+        } else {
+            long epochMillis = toMillis(epochSecond, nano);
+            epochMillis = 
timestampZoneProjector.projectEpochMillis(epochMillis);
+            aDateTime.setValue(epochMillis);
+            datetimeSerde.serialize(aDateTime, output);
+        }
+    }
+
+    private static TimestampUnit getTimestampUnit(Type type) {
+        return isTimestampNano(type) ? TimestampUnit.NANOS : 
TimestampUnit.MICROS;
+    }
+
+    private static boolean isTimestampNano(Type type) {
+        return type.typeId() == Type.TypeID.TIMESTAMP_NANO;
+    }
+
+    private static long toMillis(long epochSecond, int nano) throws 
HyracksDataException {
+        try {
+            return Math.addExact(Math.multiplyExact(epochSecond, 1_000L), nano 
/ 1_000_000L);
+        } catch (ArithmeticException ex) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex,
+                    "timestamp value overflows long representation in 
milliseconds");
+        }
+    }
+
+    private static long toMicros(long epochSecond, int nano) throws 
HyracksDataException {
+        try {
+            return Math.addExact(Math.multiplyExact(epochSecond, 1_000_000L), 
nano / 1_000L);
+        } catch (ArithmeticException ex) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex,
+                    "timestamp value overflows long representation in 
microseconds");
+        }
+    }
+
+    // Epoch nanoseconds stored in a long overflow outside roughly 1677-09-21 
to 2262-04-11
+    private static long toNanos(long epochSecond, int nano) throws 
HyracksDataException {
+        try {
+            return Math.addExact(Math.multiplyExact(epochSecond, 
1_000_000_000L), nano);
+        } catch (ArithmeticException ex) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex,
+                    "timestamp value overflows long representation in 
nanoseconds");
+        }
+    }
+
     private static HyracksDataException createUnsupportedException(Type type) {
         return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Iceberg 
Parser", type.toString());
     }
@@ -429,4 +493,101 @@
             default -> throw createUnsupportedException(type);
         };
     }
+
+    private enum TimestampUnit {
+        MILLIS,
+        MICROS,
+        NANOS
+    }
+
+    // Not thread-safe by design: one instance per parser, parsers are 
per-task/per-thread.
+    private static final class TimestampZoneProjector {
+        private static final long MILLIS_PER_SECOND = 1_000L;
+        private static final long MICROS_PER_SECOND = 1_000_000L;
+        private static final long NANOS_PER_SECOND = 1_000_000_000L;
+
+        private final boolean enabled;
+        private final ZoneRules zoneRules;
+        private final boolean fixedOffsetZone;
+        private final int fixedOffsetSeconds;
+
+        private long validFromEpochSecond = Long.MIN_VALUE;
+        private long validUntilEpochSecond = Long.MIN_VALUE;
+        private int cachedOffsetSeconds;
+
+        private TimestampZoneProjector(ZoneId zoneId) {
+            enabled = zoneId != null;
+
+            if (enabled) {
+                zoneRules = zoneId.getRules();
+                fixedOffsetZone = zoneRules.isFixedOffset();
+                fixedOffsetSeconds = fixedOffsetZone ? 
zoneRules.getOffset(Instant.EPOCH).getTotalSeconds() : 0;
+            } else {
+                zoneRules = null;
+                fixedOffsetZone = true;
+                fixedOffsetSeconds = 0;
+            }
+        }
+
+        private long projectEpochMillis(long epochMillis) throws 
HyracksDataException {
+            return projectEpochValue(epochMillis, TimestampUnit.MILLIS);
+        }
+
+        private long projectEpochValue(long epochValue, TimestampUnit unit) 
throws HyracksDataException {
+            if (!enabled) {
+                return epochValue;
+            }
+
+            int offsetSeconds = getOffsetSeconds(epochValue, unit);
+
+            try {
+                return switch (unit) {
+                    case MILLIS -> Math.addExact(epochValue, offsetSeconds * 
MILLIS_PER_SECOND);
+                    case MICROS -> Math.addExact(epochValue, offsetSeconds * 
MICROS_PER_SECOND);
+                    case NANOS -> Math.addExact(epochValue, offsetSeconds * 
NANOS_PER_SECOND);
+                };
+            } catch (ArithmeticException ex) {
+                throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
+                        "timestamp value overflows long representation after 
applying timezone configuration");
+            }
+        }
+
+        private int getOffsetSeconds(long epochValue, TimestampUnit unit) {
+            if (fixedOffsetZone) {
+                return fixedOffsetSeconds;
+            }
+
+            long epochSecond = toEpochSecond(epochValue, unit);
+
+            if (epochSecond >= validFromEpochSecond && epochSecond < 
validUntilEpochSecond) {
+                return cachedOffsetSeconds;
+            }
+
+            return refreshOffsetCache(epochSecond);
+        }
+
+        private int refreshOffsetCache(long epochSecond) {
+            Instant instant = Instant.ofEpochSecond(epochSecond);
+
+            ZoneOffset offset = zoneRules.getOffset(instant);
+            // Use epochSecond + 1ns so that if the record falls exactly on a 
transition boundary,
+            // previousTransition captures that transition as the start of the 
current offset period.
+            ZoneOffsetTransition previous = 
zoneRules.previousTransition(Instant.ofEpochSecond(epochSecond, 1L));
+            ZoneOffsetTransition next = zoneRules.nextTransition(instant);
+
+            validFromEpochSecond = previous == null ? Long.MIN_VALUE : 
previous.getInstant().getEpochSecond();
+            validUntilEpochSecond = next == null ? Long.MAX_VALUE : 
next.getInstant().getEpochSecond();
+            cachedOffsetSeconds = offset.getTotalSeconds();
+
+            return cachedOffsetSeconds;
+        }
+
+        private static long toEpochSecond(long epochValue, TimestampUnit unit) 
{
+            return switch (unit) {
+                case MILLIS -> Math.floorDiv(epochValue, MILLIS_PER_SECOND);
+                case MICROS -> Math.floorDiv(epochValue, MICROS_PER_SECOND);
+                case NANOS -> Math.floorDiv(epochValue, NANOS_PER_SECOND);
+            };
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 7eb1b9c..713a1fb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.util;

+import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
 import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
 import static org.apache.asterix.common.exceptions.ErrorCode.PARSE_ERROR;
 import static 
org.apache.asterix.common.exceptions.ErrorCode.PROPERTY_INVALID_VALUE_TYPE;
@@ -52,6 +53,7 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
@@ -60,9 +62,9 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.TimeZone;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
@@ -144,7 +146,7 @@

 public class ExternalDataUtils {

-    private static final Set<String> validTimeZones = 
Set.of(TimeZone.getAvailableIDs());
+    private static final Map<String, String> validTimeZonesMap = new 
HashMap<>();
     private static final Map<ATypeTag, IValueParserFactory> 
valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
     private static final int HEADER_FUDGE = 64;
@@ -158,6 +160,9 @@
         valueParserFactoryMap.put(ATypeTag.BIGINT, LongParserFactory.INSTANCE);
         valueParserFactoryMap.put(ATypeTag.STRING, 
UTF8StringParserFactory.INSTANCE);
         valueParserFactoryMap.put(ATypeTag.BOOLEAN, 
BooleanParserFactory.INSTANCE);
+        for (String id : TimeZone.getAvailableIDs()) {
+            validTimeZonesMap.put(id.toLowerCase(Locale.ROOT), id);
+        }
     }

     private ExternalDataUtils() {
@@ -539,10 +544,9 @@
             throw new 
CompilationException(ErrorCode.INVALID_DELTA_TABLE_FORMAT,
                     configuration.get(ExternalDataConstants.KEY_FORMAT));
         }
-        if 
(configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE)
-                && 
!validTimeZones.contains(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE)))
 {
-            throw new CompilationException(ErrorCode.INVALID_TIMEZONE,
-                    
configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE));
+        if 
(configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE)) {
+            String resolved = 
resolveTimeZone(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE));
+            configuration.put(ExternalDataConstants.DeltaOptions.TIMEZONE, 
resolved);
         }
     }

@@ -1011,21 +1015,26 @@
         return isParquetFormat(properties) || isDeltaTable(properties) || 
isIcebergTable(properties);
     }

-    /**
-     * Validate Parquet dataset's declared type and configuration
-     *
-     * @param properties        external dataset configuration
-     * @param datasetRecordType dataset declared type
-     */
+    public static String resolveTimeZone(String timeZoneId) throws 
CompilationException {
+        String canonical = 
validTimeZonesMap.get(timeZoneId.toLowerCase(Locale.ROOT));
+        if (canonical != null) {
+            return canonical;
+        }
+        try {
+            return ZoneId.of(timeZoneId).getId();
+        } catch (Exception e) {
+            throw CompilationException.create(ErrorCode.INVALID_TIMEZONE, e, 
timeZoneId);
+        }
+    }
+
     public static void validateParquetTypeAndConfiguration(Map<String, String> 
properties,
             ARecordType datasetRecordType) throws CompilationException {
         if (isParquetFormat(properties)) {
             if (datasetRecordType.getFieldTypes().length != 0) {
                 throw new 
CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, 
datasetRecordType.getTypeName());
-            } else if (properties.containsKey(ParquetOptions.TIMEZONE)
-                    && 
!validTimeZones.contains(properties.get(ParquetOptions.TIMEZONE))) {
-                //Ensure the configured time zone id is correct
-                throw new CompilationException(ErrorCode.INVALID_TIMEZONE, 
properties.get(ParquetOptions.TIMEZONE));
+            } else if (properties.containsKey(ParquetOptions.TIMEZONE)) {
+                String resolved = 
resolveTimeZone(properties.get(ParquetOptions.TIMEZONE));
+                properties.put(ParquetOptions.TIMEZONE, resolved);
             }
         }
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
index ab2ffa9..b2a2cdf 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergSnapshotUtils.java
@@ -28,7 +28,6 @@

 import java.time.LocalDate;
 import java.time.LocalDateTime;
-import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeParseException;
 import java.util.Map;
@@ -36,7 +35,6 @@

 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
@@ -49,8 +47,6 @@
     public static Optional<Long> validateAndGetSnapshot(Map<String, String> 
properties) throws CompilationException {
         String snapshotId = properties.get(ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
         String snapshotTimestamp = 
properties.get(ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
-        String timeZoneId = 
properties.get(ExternalDataConstants.IcebergOptions.TIMEZONE);
-        ZoneId zoneId = timeZoneId != null ? ZoneId.of(timeZoneId) : 
ZoneOffset.UTC;
         if (snapshotId != null && snapshotTimestamp != null) {
             throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
                     ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY, 
ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
@@ -60,7 +56,7 @@
             if (snapshotId != null) {
                 return Optional.of(Long.parseLong(snapshotId));
             } else if (snapshotTimestamp != null) {
-                return Optional.of(parseTimestamp(snapshotTimestamp, zoneId));
+                return Optional.of(parseTimestamp(snapshotTimestamp));
             } else {
                 return Optional.empty();
             }
@@ -70,7 +66,7 @@
         }
     }

-    private static long parseTimestamp(String timestamp, ZoneId zoneId) throws 
CompilationException {
+    private static long parseTimestamp(String timestamp) throws 
CompilationException {
         try {
             // try parsing as a long first
             return Long.parseLong(timestamp);
@@ -79,14 +75,14 @@

         try {
             // try parsing as ISO 8601 date, e.g., "yyyy-MM-dd"
-            return LocalDate.parse(timestamp, 
ISO_LOCAL_DATE).atStartOfDay(zoneId).toInstant().toEpochMilli();
+            return LocalDate.parse(timestamp, 
ISO_LOCAL_DATE).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli();
         } catch (DateTimeParseException ignored) {
         }

         try {
             // try parsing as ISO 8601 timestamp, e.g., "yyyy-MM-dd'T'HH:mm:ss"
             LocalDateTime localDateTime = LocalDateTime.parse(timestamp, 
ISO_LOCAL_DATE_TIME);
-            return localDateTime.atZone(zoneId).toInstant().toEpochMilli();
+            return 
localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
         } catch (DateTimeParseException ignored) {
             throw CompilationException.create(EXTERNAL_SOURCE_ERROR,
                     "unexpected TIMESTAMP snapshot format. Allow formats are: 
(milliseconds, yyyy-MM-dd or "
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 6366275..9bd3e67 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -167,6 +167,13 @@
                     IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
         }

+        // if timezone is provided, validate it
+        String timezone = 
properties.get(ExternalDataConstants.ParquetOptions.TIMEZONE);
+        if (timezone != null && !timezone.isEmpty()) {
+            ExternalDataUtils.resolveTimeZone(timezone);
+        }
+
+        // validate snapshot
         IcebergSnapshotUtils.validateAndGetSnapshot(properties);
     }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21314?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I2887ab0d9a79bf555a38596fdf941ff88d2e7aab
Gerrit-Change-Number: 21314
Gerrit-PatchSet: 6
Gerrit-Owner: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Hussain Towaileb <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-CC: Anon. E. Moose #1000171

Reply via email to