Reduce函数中,a可以认为是状态,你应该返回a试试,最好还是根据时间或者别的做个判断,然后输出,当然这些前提都是你的数据间隔小于10s 从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送
发件人: Lei Wang<mailto:leiwang...@gmail.com> 发送时间: 2022年3月1日 11:20 收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> 主题: Re: keyBy 后的 getKey 函数调用了两次 谢谢,了解了。 另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出: env.addSource(consumer).keyBy(new KeySelector<String, String>() { @Override public String getKey(String value) throws Exception { return value; } }).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1)); 上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。 需要用什么方式实现这个功能比较合适呢? On Tue, Mar 1, 2022 at 10:52 AM yidan zhao <hinobl...@gmail.com> wrote: > > keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。 > > Lei Wang <leiwang...@gmail.com> 于2022年3月1日周二 10:49写道: > > > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。 > > > > env.addSource(consumer).keyBy(new KeySelector<String, String>() { > > @Override > > public String getKey(String value) throws Exception { > > System.out.println(value); > > return value; > > } > > }).addSink(new SinkTest(1)); > > > > > > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。 > > > > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢? > > > > > > 谢谢, > > > > 王磊 > > >