我把 windows 时间设置成了 10s , 直接用命令行往kafka 发消息
head -100 filename |./bin/kafka-console-producer --broker-list 172.19.78.50:9092,172.19.78.51:9092,172.19.78.52:9092 --topic ods_artemis_out_order --property parse.key=true 在 TemplateMySQLSink log 日志,运行后在 日志中看到的 ------------------ Original ------------------ From: "赵一旦"<[email protected]>; Date: Wed, Aug 19, 2020 10:04 PM To: "user-zh"<[email protected]>; Subject: Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function 没搞懂你们说的每一条记录一次入库是如何观察出来的,1s的周期,那么快咋观察的。 aggregate就是每条记录触发一次,但最终输出到下游是1s周期到了才输出的。 王磊2 <[email protected]> 于2020年8月19日周三 下午7:18写道: > 我改成下面这样还是同样的问题 > 我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1 > > > > &nbsp;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process( > new ProcessWindowFunction<ObjectNode, List<ObjectNode&gt;, > Tuple2<String, JsonNode&gt;, TimeWindow&gt;() { > @Override > public void process(Tuple2<String, JsonNode&gt; > stringJsonNodeTuple2, Context context, > Iterable<ObjectNode&gt; iterable, > Collector<List<ObjectNode&gt;&gt; collector) throws Exception { > List<ObjectNode&gt; lists = new ArrayList<ObjectNode&gt;(); > for(ObjectNode node : iterable){ > lists.add(node); > } > collector.collect(lists); > } > }).addSink(new TemplateMySQLSink());&nbsp; > ------------------&nbsp;Original&nbsp;------------------ > From: &nbsp;"guoliang_wang1335"<[email protected]&gt;; > Date: &nbsp;Wed, Aug 19, 2020 06:49 PM > To: &nbsp;"user-zh"<[email protected]&gt;; > > Subject: &nbsp;Re:TumblingProcessingTimeWindows 每来一条消息就会 trigger > aggregate function > > &nbsp; > > > 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。 > > > > > 在 2020-08-19 18:27:25,"[email protected]" <[email protected]&gt; > 写道: > &gt; > &gt;接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 > &gt; > &gt;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new > ListAggregate()).addSink(new TemplateMySQLSink()); > &gt; > &gt; > &gt;ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 > &gt; > &gt;但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 > &gt; > &gt;有什么方式让一个窗口只做一次 aggregate 操作吗? > &gt; > &gt;谢谢, > &gt;王磊 > &gt; > &gt; > &gt; > &gt;[email protected] > &gt;
