This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ce0c2671a0a [HUDI-7647] READ_UTC_TIMEZONE doesn't affect log files for 
MOR tables (#11066)
ce0c2671a0a is described below

commit ce0c2671a0a5e010173e0e6caf9c21ca2f175a30
Author: Марк Бухнер <66881554+alowa...@users.noreply.github.com>
AuthorDate: Wed Apr 24 08:06:25 2024 +0700

    [HUDI-7647] READ_UTC_TIMEZONE doesn't affect log files for MOR tables 
(#11066)
---
 .../hudi/source/stats/ColumnStatsIndices.java      |  2 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |  8 ++---
 .../apache/hudi/util/AvroToRowDataConverters.java  | 42 +++++++++++++---------
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 31 ++++++++--------
 4 files changed, 46 insertions(+), 37 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
index 05931876603..7032f299368 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
@@ -272,7 +272,7 @@ public class ColumnStatsIndices {
       LogicalType logicalType,
       Map<LogicalType, AvroToRowDataConverters.AvroToRowDataConverter> 
converters) {
     AvroToRowDataConverters.AvroToRowDataConverter converter =
-        converters.computeIfAbsent(logicalType, k -> 
AvroToRowDataConverters.createConverter(logicalType));
+        converters.computeIfAbsent(logicalType, k -> 
AvroToRowDataConverters.createConverter(logicalType, true));
     return converter.convert(rawVal);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 29bb0a06d8c..3690fc911d8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -351,7 +351,7 @@ public class MergeOnReadInputFormat
     final Schema requiredSchema = new 
Schema.Parser().parse(tableState.getRequiredAvroSchema());
     final GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(requiredSchema);
     final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
-        
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+        
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(), 
conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
     final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, 
tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf);
     final Iterator<String> logRecordsKeyIterator = 
scanner.getRecords().keySet().iterator();
     final int[] pkOffset = tableState.getPkOffsetsInRequired();
@@ -431,7 +431,7 @@ public class MergeOnReadInputFormat
     final Schema requiredSchema = new 
Schema.Parser().parse(tableState.getRequiredAvroSchema());
     final GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(requiredSchema);
     final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
-        
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
+        
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(), 
conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
     final FormatUtils.BoundedMemoryRecords records = new 
FormatUtils.BoundedMemoryRecords(split, tableSchema, 
internalSchemaManager.getQuerySchema(), hadoopConf, conf);
     final Iterator<HoodieRecord<?>> recordsIterator = 
records.getRecordsIterator();
 
@@ -478,7 +478,7 @@ public class MergeOnReadInputFormat
   protected ClosableIterator<RowData> 
getFullLogFileIterator(MergeOnReadInputSplit split) {
     final Schema tableSchema = new 
Schema.Parser().parse(tableState.getAvroSchema());
     final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
-        AvroToRowDataConverters.createRowConverter(tableState.getRowType());
+        AvroToRowDataConverters.createRowConverter(tableState.getRowType(), 
conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
     final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, 
tableSchema, InternalSchema.getEmptyInternalSchema(), conf, hadoopConf);
     final Iterator<String> logRecordsKeyIterator = 
scanner.getRecords().keySet().iterator();
 
@@ -736,7 +736,7 @@ public class MergeOnReadInputFormat
       this.operationPos = operationPos;
       this.avroProjection = avroProjection;
       this.rowDataToAvroConverter = 
RowDataToAvroConverters.createConverter(tableRowType, 
flinkConf.getBoolean(FlinkOptions.WRITE_UTC_TIMEZONE));
-      this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
+      this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType, 
flinkConf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
       this.projection = projection;
       this.instantRange = split.getInstantRange().orElse(null);
       List<String> mergers = 
Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(","))
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index 38633b8ad9e..0caafca8259 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -43,6 +43,7 @@ import org.joda.time.DateTimeFieldType;
 import java.io.Serializable;
 import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalTime;
@@ -72,12 +73,15 @@ public class AvroToRowDataConverters {
   // 
-------------------------------------------------------------------------------------
   // Runtime Converters
   // 
-------------------------------------------------------------------------------------
-
   public static AvroToRowDataConverter createRowConverter(RowType rowType) {
+    return createRowConverter(rowType, true);
+  }
+
+  public static AvroToRowDataConverter createRowConverter(RowType rowType, 
boolean utcTimezone) {
     final AvroToRowDataConverter[] fieldConverters =
         rowType.getFields().stream()
             .map(RowType.RowField::getType)
-            .map(AvroToRowDataConverters::createNullableConverter)
+            .map(type -> AvroToRowDataConverters.createNullableConverter(type, 
utcTimezone))
             .toArray(AvroToRowDataConverter[]::new);
     final int arity = rowType.getFieldCount();
 
@@ -94,8 +98,8 @@ public class AvroToRowDataConverters {
   /**
    * Creates a runtime converter which is null safe.
    */
-  private static AvroToRowDataConverter createNullableConverter(LogicalType 
type) {
-    final AvroToRowDataConverter converter = createConverter(type);
+  private static AvroToRowDataConverter createNullableConverter(LogicalType 
type, boolean utcTimezone) {
+    final AvroToRowDataConverter converter = createConverter(type, 
utcTimezone);
     return avroObject -> {
       if (avroObject == null) {
         return null;
@@ -107,7 +111,7 @@ public class AvroToRowDataConverters {
   /**
    * Creates a runtime converter which assuming input object is not null.
    */
-  public static AvroToRowDataConverter createConverter(LogicalType type) {
+  public static AvroToRowDataConverter createConverter(LogicalType type, 
boolean utcTimezone) {
     switch (type.getTypeRoot()) {
       case NULL:
         return avroObject -> null;
@@ -129,9 +133,9 @@ public class AvroToRowDataConverters {
       case TIME_WITHOUT_TIME_ZONE:
         return AvroToRowDataConverters::convertToTime;
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return createTimestampConverter(((LocalZonedTimestampType) 
type).getPrecision());
+        return createTimestampConverter(((LocalZonedTimestampType) 
type).getPrecision(), true);
       case TIMESTAMP_WITHOUT_TIME_ZONE:
-        return createTimestampConverter(((TimestampType) type).getPrecision());
+        return createTimestampConverter(((TimestampType) type).getPrecision(), 
utcTimezone);
       case CHAR:
       case VARCHAR:
         return avroObject -> StringData.fromString(avroObject.toString());
@@ -141,12 +145,12 @@ public class AvroToRowDataConverters {
       case DECIMAL:
         return createDecimalConverter((DecimalType) type);
       case ARRAY:
-        return createArrayConverter((ArrayType) type);
+        return createArrayConverter((ArrayType) type, utcTimezone);
       case ROW:
-        return createRowConverter((RowType) type);
+        return createRowConverter((RowType) type, utcTimezone);
       case MAP:
       case MULTISET:
-        return createMapConverter(type);
+        return createMapConverter(type, utcTimezone);
       default:
         throw new UnsupportedOperationException("Unsupported type: " + type);
     }
@@ -170,9 +174,9 @@ public class AvroToRowDataConverters {
     };
   }
 
-  private static AvroToRowDataConverter createArrayConverter(ArrayType 
arrayType) {
+  private static AvroToRowDataConverter createArrayConverter(ArrayType 
arrayType, boolean utcTimezone) {
     final AvroToRowDataConverter elementConverter =
-        createNullableConverter(arrayType.getElementType());
+        createNullableConverter(arrayType.getElementType(), utcTimezone);
     final Class<?> elementClass =
         LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
 
@@ -187,11 +191,11 @@ public class AvroToRowDataConverters {
     };
   }
 
-  private static AvroToRowDataConverter createMapConverter(LogicalType type) {
+  private static AvroToRowDataConverter createMapConverter(LogicalType type, 
boolean utcTimezone) {
     final AvroToRowDataConverter keyConverter =
-        createConverter(DataTypes.STRING().getLogicalType());
+        createConverter(DataTypes.STRING().getLogicalType(), utcTimezone);
     final AvroToRowDataConverter valueConverter =
-        
createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type));
+        
createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type), 
utcTimezone);
 
     return avroObject -> {
       final Map<?, ?> map = (Map<?, ?>) avroObject;
@@ -205,7 +209,7 @@ public class AvroToRowDataConverters {
     };
   }
 
-  private static AvroToRowDataConverter createTimestampConverter(int 
precision) {
+  private static AvroToRowDataConverter createTimestampConverter(int 
precision, boolean utcTimezone) {
     final ChronoUnit chronoUnit;
     if (precision <= 3) {
       chronoUnit = ChronoUnit.MILLIS;
@@ -233,7 +237,11 @@ public class AvroToRowDataConverters {
               "Unexpected object type for TIMESTAMP logical type. Received: " 
+ avroObject);
         }
       }
-      return TimestampData.fromInstant(instant);
+      if (utcTimezone) {
+        return TimestampData.fromInstant(instant);
+      } else {
+        return TimestampData.fromTimestamp(Timestamp.from(instant)); // this 
applies the local timezone
+      }
     };
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index ac38d92a577..bf050fe399d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -480,7 +480,7 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @MethodSource("tableTypeAndPartitioningParams")
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void testStreamReadFilterByPartition(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setString(FlinkOptions.TABLE_NAME, "t1");
@@ -568,7 +568,7 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @MethodSource("tableTypeAndPartitioningParams")
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void testWriteAndReadWithProctimeSequence(HoodieTableType tableType, boolean 
hiveStylePartitioning) {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
@@ -591,7 +591,7 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @MethodSource("tableTypeAndPartitioningParams")
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void 
testWriteAndReadWithProctimeSequenceWithTsColumnExisting(HoodieTableType 
tableType, boolean hiveStylePartitioning) {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
@@ -641,7 +641,7 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @MethodSource("tableTypeAndPartitioningParams")
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void testBatchModeUpsert(HoodieTableType tableType, boolean 
hiveStylePartitioning) {
     TableEnvironment tableEnv = batchTableEnv;
     String hoodieTableDDL = sql("t1")
@@ -1836,8 +1836,8 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @EnumSource(value = HoodieTableType.class)
-  void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType) {
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
+  void testWriteReadWithTimestampWithoutTZ(HoodieTableType tableType, boolean 
readUtcTimezone) {
     TableEnvironment tableEnv = batchTableEnv;
     tableEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
     String createTable = sql("t1")
@@ -1849,8 +1849,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.PRECOMBINE_FIELD, "f1")
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .option(FlinkOptions.WRITE_UTC_TIMEZONE, false)
-        //FlinkOptions.READ_UTC_TIMEZONE doesn't affect in 
MergeOnReadInputFormat since the option isn't supported in 
AvroToRowDataConverters
-        //.option(FlinkOptions.READ_UTC_TIMEZONE, false)
+        .option(FlinkOptions.READ_UTC_TIMEZONE, readUtcTimezone)
         .pkField("f0")
         .noPartition()
         .end();
@@ -1872,15 +1871,17 @@ public class ITTestHoodieDataSource {
     List<Row> result = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
     formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
+
+    final ZoneId expectedZoneId = readUtcTimezone ? ZoneId.of("UTC") : 
ZoneId.systemDefault();
     final String expected = "["
         + "+I[1"
         + ", abc"
-        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
1000), ZoneId.of("UTC")))
-        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
2000), ZoneId.of("UTC"))) + "], "
+        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
1000), expectedZoneId))
+        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
2000), expectedZoneId)) + "], "
         + "+I[2"
         + ", def"
-        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
3000), ZoneId.of("UTC")))
-        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
4000), ZoneId.of("UTC"))) + "]]";
+        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
3000), expectedZoneId))
+        + ", " + 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMillis + 
4000), expectedZoneId)) + "]]";
 
     assertRowsEquals(result, expected);
   }
@@ -2015,7 +2016,7 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @MethodSource("tableTypeAndPartitioningParams")
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void testDynamicPartitionPrune(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setString(FlinkOptions.TABLE_NAME, "t1");
@@ -2147,9 +2148,9 @@ public class ITTestHoodieDataSource {
   }
 
   /**
-   * Return test params => (HoodieTableType, hive style partitioning).
+   * Return test params => (HoodieTableType, true/false).
    */
-  private static Stream<Arguments> tableTypeAndPartitioningParams() {
+  private static Stream<Arguments> tableTypeAndBooleanTrueFalseParams() {
     Object[][] data =
         new Object[][] {
             {HoodieTableType.COPY_ON_WRITE, false},

Reply via email to