大家好

       问题的原因定位到了。
       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op 
chain 在一起,不能确定到底是那个环节存在问题)
       发现在  WatermarkAssigner(rowtime=[event_time], watermark=[(event_ti...)这个 
op 中部分 task 为 No watermark,由于这个op和source chain在一起,导致这个vertex 
对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark 为 min(parent task 
output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  
table.exec.source.idle-timeout = 10s 参数即可。
       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 
watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 
的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。


Best forideal








在 2020-08-13 12:56:57,"forideal" <[email protected]> 写道:
>大家好
>
>
>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
> StreamExecWatermarkAssigner
>        在translateToPlanInternal 中生成了如下一个 class 代码,
>public final class WatermarkGenerator$2 extends 
>org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
>WatermarkGenerator$2(Object[] references) throws Exception { } @Override 
>public void open(org.apache.flink.configuration.Configuration parameters) 
>throws Exception { } @Override public Long 
>currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
>Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean 
>isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp 
>result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { 
>field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = 
>null; if (!isNull$4) { result$5 = 
>org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return 
>null; } else { return result$5.getMillisecond(); } } @Override public void 
>close() throws Exception { } } 
>         
>
>
>       其中关键的信息是 result$5 = 
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>  - ((long) 10000L), field$3.getNanoOfMillisecond());
>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 
>watermark。
>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 
>这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>
>  Best forideal
>
>
>
>
>
>
>
>
>
>
>
>在 2020-08-11 17:13:01,"forideal" <[email protected]> 写道:
>>大家好,请教一个问题
>>
>>
>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
>> watermark。消费大量的数据的时候,就无法生成watermark。
>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
>> EventTime mode 模式,Blink Planner。
>>|
>>No Watermark |
>>           SQL如下
>>
>>
>>          DDL:
>>           create table test(
>>                       user_id varchar,
>>                       action varchar,
>>                       event_time TIMESTAMP(3),
>>                       WATERMARK FOR event_time AS event_time - INTERVAL '10' 
>> SECOND
>>           ) with();
>>
>>
>>          DML:
>>insert into
>>  console
>>select
>>  user_id,
>>  f_get_str(bind_id) as id_list
>>from
>>  (
>>    select
>>      action as bind_id,
>>      user_id,
>>      event_time
>>    from
>>      (
>>        SELECT
>>          user_id,
>>          action,
>>          PROCTIME() as proc_time,
>>          event_time
>>        FROM
>>          test
>>      ) T
>>    where
>>      user_id is not null
>>      and user_id <> ''
>>      and CHARACTER_LENGTH(user_id) = 24
>>  ) T
>>group by
>>  SESSION(event_time, INTERVAL '10' SECOND),
>>  user_id
>> 
>>Best forideal

回复