Hi Zhou Zach:
      
“但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
 ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的”
  
      关于这个问题,我昨天也和李本超进行了线下沟通,大概的结论是:
       >1.如果不直接看每个operator的metrics,只看 flink ui 那个 graph 图,不进行 disable chain 
的话,是看不出来问题的。如果打开了disable chain 有可能能看出来问题。在我们这个场景下能看出来问题。其他场景可能会不那么直接。
       >2.我们打开了disable chain 看 flink ui,其实也是看的相关的 metric。


       总体来看,就是要用 metrics 来诊断问题。不过,在某些场景下,一个用户开发了一个 Flink SQL 然后,去看监控,也增加了对应的 
cost。一个是说这个用户之前不怎么看metrics,一个是说平台metrics也做的不好。所以如果能在 Flink ui 
上面解决一定量的问题,将能减少用户的成本。


Best forideal


      

















在 2020-08-13 16:33:29,"Zhou Zach" <wander...@163.com> 写道:
>
>
>
>Hi forideal, Shengkai Fang,
>   
>加上env.disableOperatorChaining()之后,发现5个算子,
>
>
>
>
>Source: TableSourceScan(table=[[default_catalog, default_database, user]], 
>fields=[uid, sex, age, created_time]) -> 
>
>Calc(select=[uid, sex, age, created_time, () AS procTime, 
>TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd 
>HH:mm:ss')) AS eventTime]) -> 
>
>WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime - 3000:INTERVAL 
>SECOND)]) -> 
>
>Calc(select=[uid, sex, age, created_time]) -> 
>
>Sink: Sink(table=[default_catalog.default_database.user_mysql], fields=[uid, 
>sex, age, created_time])
>但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
> ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-08-13 15:39:44,"forideal" <fszw...@163.com> 写道:
>>Hi Zhou Zach:
>>你可以试试 env.disableOperatorChaining();
>>然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
>>> 我是怎么设置参数的
>>我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
>>tableEnv.getConfig().getConfiguration() .setString(key, 
>>configs.getString(key, null));
>>同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL '10' 
>>SECOND
>>
>>Best forideal
>>
>>
>>在 2020-08-13 15:20:13,"Zhou Zach" <wander...@163.com> 写道:
>>>
>>>
>>>
>>>Hi forideal,
>>>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>>>
>>>
>>>    val streamExecutionEnv = 
>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>    
>>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>    streamExecutionEnv.setStateBackend(new 
>>> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>>>
>>>    val blinkEnvSettings = 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>    val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, 
>>> blinkEnvSettings)
>>>    
>>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>>>    
>>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>>>    
>>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>>>
>>>    
>>> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>>>
>>>
>>>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>>>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2020-08-13 14:02:58,"forideal" <fszw...@163.com> 写道:
>>>>大家好
>>>>
>>>>       问题的原因定位到了。
>>>>       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>>>>       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 
>>>> op chain 在一起,不能确定到底是那个环节存在问题)
>>>>       发现在  WatermarkAssigner(rowtime=[event_time], 
>>>> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source 
>>>> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 
>>>> watermark 为 min(parent task output watermark),所以下游是 No 
>>>> watermark。导致在查问题的时候,比较困难。
>>>>       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上  
>>>> table.exec.source.idle-timeout = 10s 参数即可。
>>>>       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 
>>>> watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain 观察每个 op 
>>>> 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>>>>
>>>>
>>>>Best forideal
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>在 2020-08-13 12:56:57,"forideal" <fszw...@163.com> 写道:
>>>>>大家好
>>>>>
>>>>>
>>>>>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
>>>>> StreamExecWatermarkAssigner
>>>>>        在translateToPlanInternal 中生成了如下一个 class 代码,
>>>>>public final class WatermarkGenerator$2 extends 
>>>>>org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
>>>>>WatermarkGenerator$2(Object[] references) throws Exception { } @Override 
>>>>>public void open(org.apache.flink.configuration.Configuration parameters) 
>>>>>throws Exception { } @Override public Long 
>>>>>currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
>>>>>Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; 
>>>>>boolean isNull$3; boolean isNull$4; 
>>>>>org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = 
>>>>>row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = 
>>>>>row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; 
>>>>>if (!isNull$4) { result$5 = 
>>>>>org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>>>>> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { 
>>>>>return null; } else { return result$5.getMillisecond(); } } @Override 
>>>>>public void close() throws Exception { } } 
>>>>>         
>>>>>
>>>>>
>>>>>       其中关键的信息是 result$5 = 
>>>>> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>>>>>  - ((long) 10000L), field$3.getNanoOfMillisecond());
>>>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 
>>>>>watermark。
>>>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 
>>>>>这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>>>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>>>>>
>>>>>  Best forideal
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>在 2020-08-11 17:13:01,"forideal" <fszw...@163.com> 写道:
>>>>>>大家好,请教一个问题
>>>>>>
>>>>>>
>>>>>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
>>>>>> watermark。消费大量的数据的时候,就无法生成watermark。
>>>>>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>>>>>>          Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
>>>>>> EventTime mode 模式,Blink Planner。
>>>>>>|
>>>>>>No Watermark |
>>>>>>           SQL如下
>>>>>>
>>>>>>
>>>>>>          DDL:
>>>>>>           create table test(
>>>>>>                       user_id varchar,
>>>>>>                       action varchar,
>>>>>>                       event_time TIMESTAMP(3),
>>>>>>                       WATERMARK FOR event_time AS event_time - INTERVAL 
>>>>>> '10' SECOND
>>>>>>           ) with();
>>>>>>
>>>>>>
>>>>>>          DML:
>>>>>>insert into
>>>>>>  console
>>>>>>select
>>>>>>  user_id,
>>>>>>  f_get_str(bind_id) as id_list
>>>>>>from
>>>>>>  (
>>>>>>    select
>>>>>>      action as bind_id,
>>>>>>      user_id,
>>>>>>      event_time
>>>>>>    from
>>>>>>      (
>>>>>>        SELECT
>>>>>>          user_id,
>>>>>>          action,
>>>>>>          PROCTIME() as proc_time,
>>>>>>          event_time
>>>>>>        FROM
>>>>>>          test
>>>>>>      ) T
>>>>>>    where
>>>>>>      user_id is not null
>>>>>>      and user_id <> ''
>>>>>>      and CHARACTER_LENGTH(user_id) = 24
>>>>>>  ) T
>>>>>>group by
>>>>>>  SESSION(event_time, INTERVAL '10' SECOND),
>>>>>>  user_id
>>>>>> 
>>>>>>Best forideal

回复