我把 windows 时间设置成了 10s , 直接用命令行往kafka 发消息


head -100 filename  |./bin/kafka-console-producer --broker-list 
172.19.78.50:9092,172.19.78.51:9092,172.19.78.52:9092 --topic 
ods_artemis_out_order --property  parse.key=true 


在 TemplateMySQLSink log 日志,运行后在 日志中看到的




------------------ Original ------------------
From: &nbsp;"赵一旦"<[email protected]&gt;;
Date: &nbsp;Wed, Aug 19, 2020 10:04 PM
To: &nbsp;"user-zh"<[email protected]&gt;; 

Subject: &nbsp;Re: TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate 
function

&nbsp;

没搞懂你们说的每一条记录一次入库是如何观察出来的,1s的周期,那么快咋观察的。
aggregate就是每条记录触发一次,但最终输出到下游是1s周期到了才输出的。

王磊2 <[email protected]&gt; 于2020年8月19日周三 下午7:18写道:

&gt; 我改成下面这样还是同样的问题
&gt; 我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1
&gt;
&gt;
&gt;
&gt; 
&amp;nbsp;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(
&gt;&nbsp;&nbsp;&nbsp;&nbsp; new ProcessWindowFunction<ObjectNode, 
List<ObjectNode&amp;gt;,
&gt; Tuple2<String, JsonNode&amp;gt;, TimeWindow&amp;gt;() {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public void 
process(Tuple2<String, JsonNode&amp;gt;
&gt; stringJsonNodeTuple2, Context context,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
Iterable<ObjectNode&amp;gt; iterable,
&gt; Collector<List<ObjectNode&amp;gt;&amp;gt; collector) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
List<ObjectNode&amp;gt; lists = new ArrayList<ObjectNode&amp;gt;();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
for(ObjectNode node : iterable){
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 lists.add(node);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
collector.collect(lists);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }).addSink(new TemplateMySQLSink());&amp;nbsp;
&gt; ------------------&amp;nbsp;Original&amp;nbsp;------------------
&gt; From: &amp;nbsp;"guoliang_wang1335"<[email protected]&amp;gt;;
&gt; Date: &amp;nbsp;Wed, Aug 19, 2020 06:49 PM
&gt; To: &amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt; Subject: &amp;nbsp;Re:TumblingProcessingTimeWindows&nbsp; 每来一条消息就会 trigger
&gt; aggregate function
&gt;
&gt; &amp;nbsp;
&gt;
&gt;
&gt; 
依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。
&gt;
&gt;
&gt;
&gt;
&gt; 在 2020-08-19 18:27:25,"[email protected]" 
<[email protected]&amp;gt;
&gt; 写道:
&gt; &amp;gt;
&gt; &amp;gt;接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
&gt; &amp;gt;
&gt; 
&amp;gt;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
&gt; ListAggregate()).addSink(new TemplateMySQLSink());
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 
后续只操作数据库一次
&gt; &amp;gt;
&gt; &amp;gt;但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
&gt; &amp;gt;
&gt; &amp;gt;有什么方式让一个窗口只做一次 aggregate 操作吗?
&gt; &amp;gt;
&gt; &amp;gt;谢谢,
&gt; &amp;gt;王磊
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;[email protected]
&gt; &amp;gt;

回复