Hi, Flink 1.12.1 Scala 2.12.12
While attempting to fix a serialization bug I previously wrote about, I temporarily disabled projection pushdown for my custom source implementation. I then proceeded to run the application only to encounter a ClassCastException, which after debugging was caused by the fact the WatermarkGenerator auto-generated code was trying to access the rowtime under a wrong index (0 instead of 1). My program is as follows: TableSource -> SQL query over table -> Sink Debugging the issue, I encountered ProjectWatermarkAssignerTransposeRule which resolves unused fields in the table, prunes them and re-orders the fields with new indexes ( https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java ) Now, since I *did not *use projection pushdown (which would have probably fixed the issue and caused the re-ordering of the field indexes), the wrong idx was accessed. You can see the reordering in the plan: Initially: +- LogicalProject(event_time=[$1]) +- LogicalWatermarkAssigner(rowtime=[event_time], watermark=[$1]) +- LogicalTableScan(table=[[foo-catalog, default-db, foo-table]]) After optimization: +- FlinkLogicalWatermarkAssigner(rowtime=[event_time], watermark=[$0]) +- FlinkLogicalTableSourceScan(table=[[foo-catalog, default-db, foo-table, project=[event_time]) Is this the desired behavior? I was quite surprised by this and the fact that the pruning happens regardless of if the underlying table is a TableSource or if it actually filters out these unused fields from the result query. -- Best Regards, Yuval Itzchakov.