没搞懂你们说的每一条记录一次入库是如何观察出来的,1s的周期,那么快咋观察的。
aggregate就是每条记录触发一次,但最终输出到下游是1s周期到了才输出的。

王磊2 <[email protected]> 于2020年8月19日周三 下午7:18写道:

> 我改成下面这样还是同样的问题
> 我用 kafka 客户端连续发送了 100 条记录,连续执行了 100 次入库,每次入库的条数 +1
>
>
>
> &nbsp;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(
>     new ProcessWindowFunction<ObjectNode, List<ObjectNode&gt;,
> Tuple2<String, JsonNode&gt;, TimeWindow&gt;() {
>         @Override
>         public void process(Tuple2<String, JsonNode&gt;
> stringJsonNodeTuple2, Context context,
>             Iterable<ObjectNode&gt; iterable,
> Collector<List<ObjectNode&gt;&gt; collector) throws Exception {
>             List<ObjectNode&gt; lists = new ArrayList<ObjectNode&gt;();
>             for(ObjectNode node : iterable){
>                 lists.add(node);
>             }
>             collector.collect(lists);
>         }
>     }).addSink(new TemplateMySQLSink());&nbsp;
> ------------------&nbsp;Original&nbsp;------------------
> From: &nbsp;"guoliang_wang1335"<[email protected]&gt;;
> Date: &nbsp;Wed, Aug 19, 2020 06:49 PM
> To: &nbsp;"user-zh"<[email protected]&gt;;
>
> Subject: &nbsp;Re:TumblingProcessingTimeWindows  每来一条消息就会 trigger
> aggregate function
>
> &nbsp;
>
>
> 依据我的理解,aggregate是增量计算的,在1s窗口触发后,只会一次sink。将一段消息merge成List,可使用ProcessWindowFunction。
>
>
>
>
> 在 2020-08-19 18:27:25,"[email protected]" <[email protected]&gt;
> 写道:
> &gt;
> &gt;接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
> &gt;
> &gt;keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
> ListAggregate()).addSink(new TemplateMySQLSink());
> &gt;
> &gt;
> &gt;ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
> &gt;
> &gt;但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作
> &gt;
> &gt;有什么方式让一个窗口只做一次 aggregate 操作吗?
> &gt;
> &gt;谢谢,
> &gt;王磊
> &gt;
> &gt;
> &gt;
> &gt;[email protected]
> &gt;

回复