Hi,试了,将并行度设置为2和kafka分区数9,都试了,都只有一个consumer有watermark,可能是因为我开了一个producer吧














在 2020-08-13 16:57:25,"Shengkai Fang" <fskm...@gmail.com> 写道:
>hi, watermark本来就是通过watermark assigner生成的。这是正常现象。
>我想问问 你有没有试过调大并行度来解决这个问题?因为不同partition的数据可能存在时间上的差异。
>
>Zhou Zach <wander...@163.com> 于2020年8月13日周四 下午4:33写道:
>
>>
>>
>>
>> Hi forideal, Shengkai Fang,
>>
>> 加上env.disableOperatorChaining()之后,发现5个算子,
>>
>>
>>
>>
>> Source: TableSourceScan(table=[[default_catalog, default_database, user]],
>> fields=[uid, sex, age, created_time]) ->
>>
>> Calc(select=[uid, sex, age, created_time, () AS procTime,
>> TO_TIMESTAMP(((created_time / 1000) FROM_UNIXTIME _UTF-16LE'yyyy-MM-dd
>> HH:mm:ss')) AS eventTime]) ->
>>
>> WatermarkAssigner(rowtime=[eventTime], watermark=[(eventTime -
>> 3000:INTERVAL SECOND)]) ->
>>
>> Calc(select=[uid, sex, age, created_time]) ->
>>
>> Sink: Sink(table=[default_catalog.default_database.user_mysql],
>> fields=[uid, sex, age, created_time])
>> 但是,只有最后面两个算子有watermark,所以开启OperatorChaining后,因为前面3个没有watermark,整个chain的算子都没有watermark了,那么是不是就不能通过flink
>> ui来监控watermark了,就依赖第三方监控工具来看watermark?因为上生产,肯定要开OperatorChaining的
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-08-13 15:39:44,"forideal" <fszw...@163.com> 写道:
>> >Hi Zhou Zach:
>> >你可以试试 env.disableOperatorChaining();
>> >然后观察每个 op 的 watermark 情况。这样能够简单的看下具体的情况。
>> >> 我是怎么设置参数的
>> >我使用的是 Flink SQL Blink Planner,采用的设置方式和你一样
>> >tableEnv.getConfig().getConfiguration() .setString(key,
>> configs.getString(key, null));
>> >同时我在 source table 中定义了 WATERMARK FOR event_time AS event_time - INTERVAL
>> '10' SECOND
>> >
>> >Best forideal
>> >
>> >
>> >在 2020-08-13 15:20:13,"Zhou Zach" <wander...@163.com> 写道:
>> >>
>> >>
>> >>
>> >>Hi forideal,
>> >>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>> >>
>> >>
>> >>    val streamExecutionEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> >>
>> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> >>    streamExecutionEnv.setStateBackend(new
>> RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
>> >>
>> >>    val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >>    val streamTableEnv =
>> StreamTableEnvironment.create(streamExecutionEnv, blinkEnvSettings)
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
>> >>
>> >>
>> streamTableEnv.getConfig.getConfiguration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT,"5s")
>> >>
>> >>
>> >>并且,任务的并行度设置了1(这样是不是就不会存在flink consumer不消费kafka数据的情况,kafka一直生产数据的前提下)
>> >>在flink ui上,仍然显示Watermark No data,问下,你是怎么设置参数的
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>在 2020-08-13 14:02:58,"forideal" <fszw...@163.com> 写道:
>> >>>大家好
>> >>>
>> >>>       问题的原因定位到了。
>> >>>       由于无法 debug codegen 生成的代码,即使我拿到线上的数据,开启了debug环境依然无法得到进展。
>> >>>       这个时候,我进行了 disable chain,观察 watermark
>> 的生成情况,看看到底在那个环节没有继续往下传递。(因为多个 op chain 在一起,不能确定到底是那个环节存在问题)
>> >>>       发现在  WatermarkAssigner(rowtime=[event_time],
>> watermark=[(event_ti...)这个 op 中部分 task 为 No watermark,由于这个op和source
>> chain在一起,导致这个vertex 对应的watermark无法显示只能是 no data。因为存在 group by 下游的 watermark
>> 为 min(parent task output watermark),所以下游是 No watermark。导致在查问题的时候,比较困难。
>> >>>       定位到由于 kafka 部分 partition 无数据导致 No watermark 加上
>> table.exec.source.idle-timeout = 10s 参数即可。
>> >>>       当然,如果能直接 debug codegen 生成的代码,那么这个问题的分析路径会更简单。我应该直接可以发现大部分 task
>> 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用  disable chain
>> 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
>> >>>
>> >>>
>> >>>Best forideal
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>在 2020-08-13 12:56:57,"forideal" <fszw...@163.com> 写道:
>> >>>>大家好
>> >>>>
>> >>>>
>> >>>>        关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
>> StreamExecWatermarkAssigner
>> >>>>        在translateToPlanInternal 中生成了如下一个 class 代码,
>> >>>>public final class WatermarkGenerator$2 extends
>> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
>> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
>> public void open(org.apache.flink.configuration.Configuration parameters)
>> throws Exception { } @Override public Long
>> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
>> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
>> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
>> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
>> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
>> = null; if (!isNull$4) { result$5 =
>> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>> - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
>> return null; } else { return result$5.getMillisecond(); } } @Override
>> public void close() throws Exception { } }
>> >>>>
>> >>>>
>> >>>>
>> >>>>       其中关键的信息是 result$5 =
>> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
>> - ((long) 10000L), field$3.getNanoOfMillisecond());
>> >>>>确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
>> 的定义获取的 watermark。
>> >>>>在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark
>> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
>> >>>>如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>> >>>>
>> >>>>  Best forideal
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>在 2020-08-11 17:13:01,"forideal" <fszw...@163.com> 写道:
>> >>>>>大家好,请教一个问题
>> >>>>>
>> >>>>>
>> >>>>>           我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic
>> 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。
>> >>>>>           一直是    No Watermark。 暂时找不到排查问题的思路。
>> >>>>>          Flink 版本号是 1.10,kafka
>> 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。
>> >>>>>|
>> >>>>>No Watermark |
>> >>>>>           SQL如下
>> >>>>>
>> >>>>>
>> >>>>>          DDL:
>> >>>>>           create table test(
>> >>>>>                       user_id varchar,
>> >>>>>                       action varchar,
>> >>>>>                       event_time TIMESTAMP(3),
>> >>>>>                       WATERMARK FOR event_time AS event_time -
>> INTERVAL '10' SECOND
>> >>>>>           ) with();
>> >>>>>
>> >>>>>
>> >>>>>          DML:
>> >>>>>insert into
>> >>>>>  console
>> >>>>>select
>> >>>>>  user_id,
>> >>>>>  f_get_str(bind_id) as id_list
>> >>>>>from
>> >>>>>  (
>> >>>>>    select
>> >>>>>      action as bind_id,
>> >>>>>      user_id,
>> >>>>>      event_time
>> >>>>>    from
>> >>>>>      (
>> >>>>>        SELECT
>> >>>>>          user_id,
>> >>>>>          action,
>> >>>>>          PROCTIME() as proc_time,
>> >>>>>          event_time
>> >>>>>        FROM
>> >>>>>          test
>> >>>>>      ) T
>> >>>>>    where
>> >>>>>      user_id is not null
>> >>>>>      and user_id <> ''
>> >>>>>      and CHARACTER_LENGTH(user_id) = 24
>> >>>>>  ) T
>> >>>>>group by
>> >>>>>  SESSION(event_time, INTERVAL '10' SECOND),
>> >>>>>  user_id
>> >>>>>
>> >>>>>Best forideal
>>

回复