看起来不应该,你这种写法表达的就是1s一次写mysql,不会多次。每条消息触发计算是没问题的,但窗口最终输出是1s窗口结束之后才会输出的吧。

[email protected] <[email protected]> 于2020年8月19日周三 下午6:27写道:

>
> 接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
>
> keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
> ListAggregate()).addSink(new TemplateMySQLSink());
>
>
> ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
>
> 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
>
> 有什么方式让一个窗口只做一次 aggregate 操作吗?
>
> 谢谢,
> 王磊
>
>
>
> [email protected]
>
>

回复