Hi,

Flink 1.12.1
Scala 2.12.12

While attempting to fix a serialization bug I previously wrote about, I
temporarily disabled projection pushdown for my custom source
implementation. I then proceeded to run the application only to encounter a
ClassCastException, which after debugging was caused by the fact the
WatermarkGenerator auto-generated code was trying to access the rowtime
under a wrong index (0 instead of 1).

My program is as follows:

TableSource -> SQL query over table -> Sink

Debugging the issue, I encountered ProjectWatermarkAssignerTransposeRule
which resolves unused fields in the table, prunes them and re-orders the
fields with new indexes (
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
)

Now, since I *did not *use projection pushdown (which would have probably
fixed the issue and caused the re-ordering of the field indexes), the wrong
idx was accessed.

You can see the reordering in the plan:

Initially:

         +- LogicalProject(event_time=[$1])
            +- LogicalWatermarkAssigner(rowtime=[event_time],
watermark=[$1])
               +- LogicalTableScan(table=[[foo-catalog, default-db,
foo-table]])

After optimization:

         +- FlinkLogicalWatermarkAssigner(rowtime=[event_time],
watermark=[$0])
            +- FlinkLogicalTableSourceScan(table=[[foo-catalog, default-db,
foo-table, project=[event_time])

Is this the desired behavior? I was quite surprised by this and the fact
that the pruning happens regardless of if the underlying table is a
TableSource or if it actually filters out these unused fields from the
result query.


-- 
Best Regards,
Yuval Itzchakov.

Reply via email to