大家好

        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
StreamExecWatermarkAssigner
        在translateToPlanInternal 中生成了如下一个 class 代码,
public final class WatermarkGenerator$2 extends 
org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
WatermarkGenerator$2(Object[] references) throws Exception { } @Override public 
void open(org.apache.flink.configuration.Configuration parameters) throws 
Exception { } @Override public Long 
currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean 
isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp 
result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 
= row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if 
(!isNull$4) { result$5 = 
org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
 - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return 
null; } else { return result$5.getMillisecond(); } } @Override public void 
close() throws Exception { } } 
         


       其中关键的信息是 result$5 = 
org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
 - ((long) 10000L), field$3.getNanoOfMillisecond());
确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 
watermark。
在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 
这样的结果。因为这部分codegen的代码确实无法进一步debug了。
如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢

  Best forideal











在 2020-08-11 17:13:01,"forideal" <[email protected]> 写道:
>大家好,请教一个问题
>
>
>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
> watermark。消费大量的数据的时候,就无法生成watermark。
>           一直是    No Watermark。 暂时找不到排查问题的思路。
>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
> EventTime mode 模式,Blink Planner。
>|
>No Watermark |
>           SQL如下
>
>
>          DDL:
>           create table test(
>                       user_id varchar,
>                       action varchar,
>                       event_time TIMESTAMP(3),
>                       WATERMARK FOR event_time AS event_time - INTERVAL '10' 
> SECOND
>           ) with();
>
>
>          DML:
>insert into
>  console
>select
>  user_id,
>  f_get_str(bind_id) as id_list
>from
>  (
>    select
>      action as bind_id,
>      user_id,
>      event_time
>    from
>      (
>        SELECT
>          user_id,
>          action,
>          PROCTIME() as proc_time,
>          event_time
>        FROM
>          test
>      ) T
>    where
>      user_id is not null
>      and user_id <> ''
>      and CHARACTER_LENGTH(user_id) = 24
>  ) T
>group by
>  SESSION(event_time, INTERVAL '10' SECOND),
>  user_id
> 
>Best forideal

回复