hi 那你有没有试过将并行度设置为partition的数量

Zhou Zach <wander...@163.com>于2020年8月13日 周四下午3:21写道:

>
>
>
> Hi forideal,
> 我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>
>
>     val streamExecutionEnv =
> StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     streamExecutionEnv.setStateBackend(new
> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>
>     val blinkEnvSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
> blinkEnvSettings)
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>
>
> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>
>
> 并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
> 在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-13 14:02:58,"forideal" <fszw...@163.com> 写道:
> >大家好
> >
> >       问题的原因定位到了。
> >       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
> >       这个时候,我进行了 disable chain,观察 watermark 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个
> op chain 在一起,不能确定到底是那个环节存在问题)
> >       发现在  WatermarkAssigner(rowtime=[event_time],
> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source
> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark
> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
> >       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上
> table.exec.source.idle-timeout = 10s 参数即可。
> >       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task
> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain
> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
> >
> >
> >Best forideal
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2020-08-13 12:56:57,"forideal" <fszw...@163.com> 写道:
> >>大家好
> >>
> >>
> >>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
> StreamExecWatermarkAssigner
> >>        在translateToPlanInternal 中生成了如下一个 class 代码,
> >>public final class WatermarkGenerator$2 extends
> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
> public void open(org.apache.flink.configuration.Configuration parameters)
> throws Exception { } @Override public Long
> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
> = null; if (!isNull$4) { result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
> return null; } else { return result$5.getMillisecond(); } } @Override
> public void close() throws Exception { } }
> >>
> >>
> >>
> >>       其中关键的信息是 result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 10000L), field$3.getNanoOfMillisecond());
> >>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> 的定义获取的 watermark。
> >>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark
> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
> >>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
> >>
> >>  Best forideal
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>在 2020-08-11 17:13:01,"forideal" <fszw...@163.com> 写道:
> >>>大家好,请教一个问题
> >>>
> >>>
> >>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成
> watermark。消费大量的数据的时候,就无法生成watermark。
> >>>           一直是    No Watermark。 暂时找不到排查问题的思路。
> >>>          Flink 版本号是 1.10,kafka
> 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
> >>>|
> >>>No Watermark |
> >>>           SQL如下
> >>>
> >>>
> >>>          DDL:
> >>>           create table test(
> >>>                       user_id varchar,
> >>>                       action varchar,
> >>>                       event_time TIMESTAMP(3),
> >>>                       WATERMARK FOR event_time AS event_time -
> INTERVAL '10' SECOND
> >>>           ) with();
> >>>
> >>>
> >>>          DML:
> >>>insert into
> >>>  console
> >>>select
> >>>  user_id,
> >>>  f_get_str(bind_id) as id_list
> >>>from
> >>>  (
> >>>    select
> >>>      action as bind_id,
> >>>      user_id,
> >>>      event_time
> >>>    from
> >>>      (
> >>>        SELECT
> >>>          user_id,
> >>>          action,
> >>>          PROCTIME() as proc_time,
> >>>          event_time
> >>>        FROM
> >>>          test
> >>>      ) T
> >>>    where
> >>>      user_id is not null
> >>>      and user_id <> ''
> >>>      and CHARACTER_LENGTH(user_id) = 24
> >>>  ) T
> >>>group by
> >>>  SESSION(event_time, INTERVAL '10' SECOND),
> >>>  user_id
> >>>
> >>>Best forideal
>

回复