[jira] [Commented] (FLINK-33129) Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type
[ https://issues.apache.org/jira/browse/FLINK-33129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17769352#comment-17769352 ] Zhaoyang Shao commented on FLINK-33129: --- Hi [~jadireddi] thanks for working on the converter for `TIMESTAMP_LTZ` Looks like you already had a PR for https://issues.apache.org/jira/browse/FLINK-30483. Do you mind addressing this issue together with your PR? > Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type > > > Key: FLINK-33129 > URL: https://issues.apache.org/jira/browse/FLINK-33129 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.1 >Reporter: Zhaoyang Shao >Priority: Critical > Fix For: 1.17.1 > > Original Estimate: 1h > Remaining Estimate: 1h > > While creating converter using `RowDataToAvroConverters.createConverter` with > LocalZonedTimestampType logical type, the method will throw exception. This > is because the switch clause is missing a clause for > `LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZON`. > Code: > [https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java#L75] > > We can convert the value to `LocalDateTime` and then `TimestampData` using > method below. Then we can apply the same converter as > TIMESTAMP_WITHOUT_TIME_ZONE? > > `TimestampData fromLocalDateTime(LocalDateTime dateTime)` > Can Flink team help adding the support for this logical type and logical type > root? > This is now a blocker for creating Flink Iceberg consumer with Avro > GenericRecord when IcebergTable has `TimestampTZ` type field which will be > converted to LocalZonedTimestampType. > See error below: > Unsupported type: TIMESTAMP_LTZ(6) > stack: [ [-] > > org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:186) > > > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > > > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550) > > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > > > java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) > > org.apache.flink.formats.avro.RowDataToAvroConverters.createRowConverter(RowDataToAvroConverters.java:224) > > > org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:178) > > > org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.(RowDataToAvroGenericRecordConverter.java:46) > > > org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.fromIcebergSchema(RowDataToAvroGenericRecordConverter.java:60) > > > org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.lazyConverter(AvroGenericRecordReaderFunction.java:93) > > > org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.createDataIterator(AvroGenericRecordReaderFunction.java:85) > > > org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:39) > > > org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:27) > > > org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(IcebergSourceSplitReader.java:74) > > > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > > > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) > > > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > java.util.concurrent.FutureTask.run(FutureTask.java:264) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33129) Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type
[ https://issues.apache.org/jira/browse/FLINK-33129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768959#comment-17768959 ] Zhaoyang Shao commented on FLINK-33129: --- In DataStructureConverters, there is similar implementation already using ``` putConverter( LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, Integer.class, constructor(LocalZonedTimestampIntConverter::new)); ``` [https://github.com/apache/flink/blob/9b2b4e3f194467aae0d299b3b403e0ca60c42ef0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java#L134] We can use same/smiliar approach to support TIMESTAMP_WITH_LOCAL_TIME_ZONE in `RowDataToAvroConverters.createConverter` > Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type > > > Key: FLINK-33129 > URL: https://issues.apache.org/jira/browse/FLINK-33129 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.1 >Reporter: Zhaoyang Shao >Priority: Critical > Fix For: 1.17.1 > > Original Estimate: 1h > Remaining Estimate: 1h > > While creating converter using `RowDataToAvroConverters.createConverter` with > LocalZonedTimestampType logical type, the method will throw exception. This > is because the switch clause is missing a clause for > `LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZON`. > Code: > [https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java#L75] > > We can convert the value to `LocalDateTime` and then `TimestampData` using > method below. Then we can apply the same converter as > TIMESTAMP_WITHOUT_TIME_ZONE? > > `TimestampData fromLocalDateTime(LocalDateTime dateTime)` > Can Flink team help adding the support for this logical type and logical type > root? > This is now a blocker for creating Flink Iceberg consumer with Avro > GenericRecord when IcebergTable has `TimestampTZ` type field which will be > converted to LocalZonedTimestampType. > See error below: > Unsupported type: TIMESTAMP_LTZ(6) > stack: [ [-] > > org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:186) > > > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > > > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550) > > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > > > java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) > > org.apache.flink.formats.avro.RowDataToAvroConverters.createRowConverter(RowDataToAvroConverters.java:224) > > > org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:178) > > > org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.(RowDataToAvroGenericRecordConverter.java:46) > > > org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.fromIcebergSchema(RowDataToAvroGenericRecordConverter.java:60) > > > org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.lazyConverter(AvroGenericRecordReaderFunction.java:93) > > > org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.createDataIterator(AvroGenericRecordReaderFunction.java:85) > > > org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:39) > > > org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:27) > > > org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(IcebergSourceSplitReader.java:74) > > > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > > > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) > > > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > java.util.concurrent.FutureTask.run(FutureTask.java:264) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > >
[jira] [Commented] (FLINK-33129) Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type
[ https://issues.apache.org/jira/browse/FLINK-33129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768195#comment-17768195 ] Zhaoyang Shao commented on FLINK-33129: --- [~danny0405] can you help take a look? as I saw you are the original author > Can't create RowDataToAvroConverter for LocalZonedTimestampType logical type > > > Key: FLINK-33129 > URL: https://issues.apache.org/jira/browse/FLINK-33129 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.17.1 >Reporter: Zhaoyang Shao >Priority: Blocker > Fix For: 1.17.1 > > Original Estimate: 1h > Remaining Estimate: 1h > > While creating converter using `RowDataToAvroConverters.createConverter` with > LocalZonedTimestampType logical type, the method will throw exception. This > is because the switch clause is missing a clause for > `LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZON`. > Code: > [https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java#L75] > > We can convert the value to `LocalDateTime` and then `TimestampData` using > method below. Then we can apply the same converter as > TIMESTAMP_WITHOUT_TIME_ZONE? > > `TimestampData fromLocalDateTime(LocalDateTime dateTime)` > Can Flink team help adding the support for this logical type and logical type > root? > This is now a blocker for creating Flink Iceberg consumer with Avro > GenericRecord when IcebergTable has `TimestampTZ` type field which will be > converted to LocalZonedTimestampType. > See error below: > Unsupported type: TIMESTAMP_LTZ(6) > stack: [ [-] > > org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:186) > > > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > > > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550) > > java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) > > > java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) > > org.apache.flink.formats.avro.RowDataToAvroConverters.createRowConverter(RowDataToAvroConverters.java:224) > > > org.apache.flink.formats.avro.RowDataToAvroConverters.createConverter(RowDataToAvroConverters.java:178) > > > org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.(RowDataToAvroGenericRecordConverter.java:46) > > > org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter.fromIcebergSchema(RowDataToAvroGenericRecordConverter.java:60) > > > org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.lazyConverter(AvroGenericRecordReaderFunction.java:93) > > > org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction.createDataIterator(AvroGenericRecordReaderFunction.java:85) > > > org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:39) > > > org.apache.iceberg.flink.source.reader.DataIteratorReaderFunction.apply(DataIteratorReaderFunction.java:27) > > > org.apache.iceberg.flink.source.reader.IcebergSourceSplitReader.fetch(IcebergSourceSplitReader.java:74) > > > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > > > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) > > > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > java.util.concurrent.FutureTask.run(FutureTask.java:264) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > java.lang.Thread.run(Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.10#820010)