我改成下面这样还是同样的问题
我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(
new ProcessWindowFunction<ObjectNode, List<ObjectNode>, Tuple2<String,
JsonNode>, TimeWindow>() {
@Override
public void process(Tuple2<String, JsonNode> stringJsonNodeTuple2,
Context context,
Iterable<ObjectNode> iterable, Collector<List<ObjectNode>>
collector) throws Exception {
List<ObjectNode> lists = new ArrayList<ObjectNode>();
for(ObjectNode node : iterable){
lists.add(node);
}
collector.collect(lists);
}
}).addSink(new TemplateMySQLSink());
------------------ Original ------------------
From: "guoliang_wang1335"<[email protected]>;
Date: Wed, Aug 19, 2020 06:49 PM
To: "user-zh"<[email protected]>;
Subject: Re:TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate
function
依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。
在 2020-08-19 18:27:25,"[email protected]" <[email protected]> 写道:
>
>接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
>
>keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
ListAggregate()).addSink(new TemplateMySQLSink());
>
>
>ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
>
>但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
>
>有什么方式让一个窗口只做一次 aggregate 操作吗?
>
>谢谢,
>王磊
>
>
>
>[email protected]
>