比如并行度是4,任务执行图是: Source(p=1) ==reblance=> flatMap和Timestamp/watermrk(p=4) =hash=> window(p=4) window的水位线取上游四个算子水位线的最小值, 你需要写4个数据,才能让四个子任务水位线更新,window的水位线才有一次更新 Best regards, haishui
在 2024-01-03 14:25:48,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道: >设置并行度1确实可以了。env.setParallelism(1); >这里按照key分组,我输入的key都是同一个值,应该在同一个分区。 >watermark.keyBy(item -> item.itemid) >我理解的就是处理环节其实就是单并行度的,没有问题。问题出在socket这种方式必须使用单并行度? >发件人: haishui >发送时间: 2024-01-03 13:35 >收件人: user-zh >主题: Re:回复: Re: 滑动窗口按照处理时间触发的问题 >Hi, > > >应该是并行度的原因,你可以先将并行度设置为1试试。 > > > > >Best regards, >haishui > > > > > >在 2024-01-03 12:24:20,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道: >>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的 >>public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> SingleOutputStreamOperator<UserItem> >> userItemSingleOutputStreamOperator = env >> .socketTextStream("172.24.6.109", 9999) >> .flatMap(new Splitter()); >> >> WatermarkStrategy<UserItem> watermarkStrategy = WatermarkStrategy >> .<UserItem>forMonotonousTimestamps() >> .withTimestampAssigner(new >> SerializableTimestampAssigner<UserItem>() { >> @Override >> public long extractTimestamp(UserItem element, long >> recordTimestamp) { >> System.out.println(element.timestamp); >> return element.timestamp; >> } >> }); >> SingleOutputStreamOperator<UserItem> watermark = >> userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy); >> watermark.keyBy(item -> item.itemid) >> // >> .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5))) >> .window(TumblingEventTimeWindows.of(Time.minutes(5))) >> .aggregate(new AggregateFunctionMethod(), new MyProcess()) >> .print(); >> >> env.execute(); >> } >> public static class Splitter implements FlatMapFunction<String, UserItem> >> { >> @Override >> public void flatMap(String sentence, Collector<UserItem> out) throws >> Exception { >> String[] words = sentence.split(","); >> out.collect(new >> UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2]))); >> } >> } >> >> public static class AggregateFunctionMethod implements >> AggregateFunction<UserItem, Integer, Integer> { >> @Override >> public Integer createAccumulator() { >> return 0; >> } >> >> @Override >> public Integer add(UserItem item, Integer i) { >> return i+1; >> } >> >> @Override >> public Integer getResult(Integer i) { >> return i; >> } >> >> @Override >> public Integer merge(Integer i, Integer acc1) { >> return i+acc1; >> } >> } >> >> public static class MyProcess extends >> ProcessWindowFunction<Integer,String,Integer, TimeWindow> { >> >> @Override >> public void process(Integer s, Context context, Iterable<Integer> >> elements, Collector<String> out) throws Exception { >> long startTs = context.window().getStart(); >> long endTs = context.window().getEnd(); >> String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd >> HH:mm:ss.SSS"); >> String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd >> HH:mm:ss.SSS"); >> StringBuilder sb = new StringBuilder(); >> sb.append("窗口["+windowStart+","+windowEnd); >> out.collect(sb.toString()); >> } >> } >> >>发件人: Xuyang >>发送时间: 2024-01-03 09:36 >>收件人: user-zh >>主题: Re:Re: 滑动窗口按照处理时间触发的问题 >>Hi, >>基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。 >> >> >> >> >>-- >> >> Best! >> Xuyang >> >> >> >> >> >>在 2024-01-02 23:31:54,"Jinsui Chen" <ischenjin...@gmail.com> 写道: >>>Hi, >>> >>>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。 >>> >>>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下: >>>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00 >>>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在 >>>00:20 - 01:20 这个时间窗口上。 >>>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 - >>>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35 >>>的数据,而现实是没有这样的一条数据。 >>> >>>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。 >>> >>>Best regards, >>>Jinsui >>> >>>ha.fen...@aisino.com <ha.fen...@aisino.com> 于2024年1月2日周二 20:17写道: >>> >>>> >>>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)), >>>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。 >>>> 我使用socket方式进行录入数据 >>>> 2024-01-02 01:19:01 1704129541000 >>>> 2024-01-02 01:21:01 1704129661000 >>>> 2024-01-02 01:26:01 1704129961000 >>>> 2024-01-02 01:29:01 1704130141000 >>>> 2024-01-02 01:34:01 1704130441000 >>>> 前面是对应的时间,后面是我录入系统的时间 >>>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为 >>>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000 >>>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事? >>>>