比如并行度是4,任务执行图是: Source(p=1) ==reblance=> flatMap和Timestamp/watermrk(p=4)  
=hash=> window(p=4)
window的水位线取上游四个算子水位线的最小值, 你需要写4个数据,才能让四个子任务水位线更新,window的水位线才有一次更新
Best regards,
haishui





在 2024-01-03 14:25:48,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道:
>设置并行度1确实可以了。env.setParallelism(1);
>这里按照key分组,我输入的key都是同一个值,应该在同一个分区。
>watermark.keyBy(item -> item.itemid)
>我理解的就是处理环节其实就是单并行度的,没有问题。问题出在socket这种方式必须使用单并行度?
>发件人: haishui
>发送时间: 2024-01-03 13:35
>收件人: user-zh
>主题: Re:回复: Re: 滑动窗口按照处理时间触发的问题
>Hi,
> 
> 
>应该是并行度的原因,你可以先将并行度设置为1试试。
> 
> 
> 
> 
>Best regards,
>haishui
> 
> 
> 
> 
> 
>在 2024-01-03 12:24:20,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道:
>>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的
>>public static void main(String[] args) throws Exception {
>>        StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>        SingleOutputStreamOperator<UserItem> 
>> userItemSingleOutputStreamOperator = env
>>                .socketTextStream("172.24.6.109", 9999)
>>                .flatMap(new Splitter());
>>
>>        WatermarkStrategy<UserItem> watermarkStrategy = WatermarkStrategy
>>                .<UserItem>forMonotonousTimestamps()
>>                .withTimestampAssigner(new 
>> SerializableTimestampAssigner<UserItem>() {
>>                    @Override
>>                    public long extractTimestamp(UserItem element, long 
>> recordTimestamp) {
>>                        System.out.println(element.timestamp);
>>                        return element.timestamp;
>>                    }
>>                });
>>        SingleOutputStreamOperator<UserItem> watermark = 
>> userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy);
>>        watermark.keyBy(item -> item.itemid)
>>               // 
>> .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)))
>>                .window(TumblingEventTimeWindows.of(Time.minutes(5)))
>>                .aggregate(new AggregateFunctionMethod(), new MyProcess())
>>                .print();
>>
>>        env.execute();
>>    }
>>    public static class Splitter implements FlatMapFunction<String, UserItem> 
>> {
>>        @Override
>>        public void flatMap(String sentence, Collector<UserItem> out) throws 
>> Exception {
>>            String[] words = sentence.split(",");
>>                out.collect(new 
>> UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2])));
>>            }
>>        }
>>
>>    public static class AggregateFunctionMethod implements 
>> AggregateFunction<UserItem, Integer, Integer> {
>>        @Override
>>        public Integer createAccumulator() {
>>            return 0;
>>        }
>>
>>        @Override
>>        public Integer add(UserItem item, Integer i) {
>>            return i+1;
>>        }
>>
>>        @Override
>>        public Integer getResult(Integer i) {
>>            return i;
>>        }
>>
>>        @Override
>>        public Integer merge(Integer i, Integer acc1) {
>>            return i+acc1;
>>        }
>>    }
>>
>>    public static class MyProcess extends 
>> ProcessWindowFunction<Integer,String,Integer, TimeWindow> {
>>
>>        @Override
>>        public void process(Integer s, Context context, Iterable<Integer> 
>> elements, Collector<String> out) throws Exception {
>>            long startTs = context.window().getStart();
>>            long endTs = context.window().getEnd();
>>            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd 
>> HH:mm:ss.SSS");
>>            String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd 
>> HH:mm:ss.SSS");
>>            StringBuilder sb = new StringBuilder();
>>            sb.append("窗口["+windowStart+","+windowEnd);
>>            out.collect(sb.toString());
>>        }
>>    }
>> 
>>发件人: Xuyang
>>发送时间: 2024-01-03 09:36
>>收件人: user-zh
>>主题: Re:Re: 滑动窗口按照处理时间触发的问题
>>Hi,
>>基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。
>> 
>> 
>> 
>> 
>>--
>> 
>>    Best!
>>    Xuyang
>> 
>> 
>> 
>> 
>> 
>>在 2024-01-02 23:31:54,"Jinsui Chen" <ischenjin...@gmail.com> 写道:
>>>Hi,
>>>
>>>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>>>
>>>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>>>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>>>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>>>00:20 - 01:20 这个时间窗口上。
>>>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>>>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>>>的数据,而现实是没有这样的一条数据。
>>>
>>>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>>>
>>>Best regards,
>>>Jinsui
>>>
>>>ha.fen...@aisino.com <ha.fen...@aisino.com> 于2024年1月2日周二 20:17写道:
>>>
>>>>
>>>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
>>>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
>>>> 我使用socket方式进行录入数据
>>>> 2024-01-02 01:19:01  1704129541000
>>>> 2024-01-02 01:21:01  1704129661000
>>>> 2024-01-02 01:26:01  1704129961000
>>>> 2024-01-02 01:29:01  1704130141000
>>>> 2024-01-02 01:34:01  1704130441000
>>>> 前面是对应的时间,后面是我录入系统的时间
>>>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
>>>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
>>>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>>>>

回复