我改成下面这样还是同样的问题
我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1


 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(
    new ProcessWindowFunction<ObjectNode, List<ObjectNode&gt;, Tuple2<String, 
JsonNode&gt;, TimeWindow&gt;() {
        @Override
        public void process(Tuple2<String, JsonNode&gt; stringJsonNodeTuple2, 
Context context,
            Iterable<ObjectNode&gt; iterable, Collector<List<ObjectNode&gt;&gt; 
collector) throws Exception {
            List<ObjectNode&gt; lists = new ArrayList<ObjectNode&gt;();
            for(ObjectNode node : iterable){
                lists.add(node);
            }
            collector.collect(lists);
        }
    }).addSink(new TemplateMySQLSink());&nbsp;
------------------&nbsp;Original&nbsp;------------------
From: &nbsp;"guoliang_wang1335"<[email protected]&gt;;
Date: &nbsp;Wed, Aug 19, 2020 06:49 PM
To: &nbsp;"user-zh"<[email protected]&gt;; 

Subject: &nbsp;Re:TumblingProcessingTimeWindows  每来一条消息就会 trigger aggregate 
function

&nbsp;

依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。




在 2020-08-19 18:27:25,"[email protected]" <[email protected]&gt; 写道:
&gt;
&gt;接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
&gt;
&gt;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
 ListAggregate()).addSink(new TemplateMySQLSink()); 
&gt;
&gt;
&gt;ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
&gt;
&gt;但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
&gt;
&gt;有什么方式让一个窗口只做一次 aggregate 操作吗?
&gt;
&gt;谢谢,
&gt;王磊
&gt;
&gt;
&gt;
&gt;[email protected] 
&gt;

回复