hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗? 这里倒是有一个比较hack的方法: 将生成的类放在一个java文件之中,然后修改改下GeneratedClass下的newInstance方法,如果classname == “WatermarkGenerator$2” 则将刚才的类则返回 new WatermarkGenerator$2 这个类。
我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。 forideal <[email protected]> 于2020年8月13日周四 下午12:57写道: > 大家好 > > > 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 > StreamExecWatermarkAssigner > 在translateToPlanInternal 中生成了如下一个 class 代码, > public final class WatermarkGenerator$2 extends > org.apache.flink.table.runtime.generated.WatermarkGenerator { public > WatermarkGenerator$2(Object[] references) throws Exception { } @Override > public void open(org.apache.flink.configuration.Configuration parameters) > throws Exception { } @Override public Long > currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws > Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean > isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp > result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { > field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 > = null; if (!isNull$4) { result$5 = > org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() > - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { > return null; } else { return result$5.getMillisecond(); } } @Override > public void close() throws Exception { } } > > > > 其中关键的信息是 result$5 = > org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() > - ((long) 10000L), field$3.getNanoOfMillisecond()); > 确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 > watermark。 > 在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark > 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 > 如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 > > Best forideal > > > > > > > > > > > > 在 2020-08-11 17:13:01,"forideal" <[email protected]> 写道: > >大家好,请教一个问题 > > > > > > 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 > watermark。消费大量的数据的时候,就无法生成watermark。 > > 一直是 No Watermark。 暂时找不到排查问题的思路。 > > Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 > EventTime mode 模式,Blink Planner。 > >| > >No Watermark | > > SQL如下 > > > > > > DDL: > > create table test( > > user_id varchar, > > action varchar, > > event_time TIMESTAMP(3), > > WATERMARK FOR event_time AS event_time - INTERVAL > '10' SECOND > > ) with(); > > > > > > DML: > >insert into > > console > >select > > user_id, > > f_get_str(bind_id) as id_list > >from > > ( > > select > > action as bind_id, > > user_id, > > event_time > > from > > ( > > SELECT > > user_id, > > action, > > PROCTIME() as proc_time, > > event_time > > FROM > > test > > ) T > > where > > user_id is not null > > and user_id <> '' > > and CHARACTER_LENGTH(user_id) = 24 > > ) T > >group by > > SESSION(event_time, INTERVAL '10' SECOND), > > user_id > > > >Best forideal >
