依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。




在 2020-08-19 18:27:25,"[email protected]" <[email protected]> 写道:
>
>接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
>
>keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
> ListAggregate()).addSink(new TemplateMySQLSink()); 
>
>
>ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
>
>但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
>
>有什么方式让一个窗口只做一次 aggregate 操作吗?
>
>谢谢,
>王磊
>
>
>
>[email protected] 
>

回复