Hi,

应该是并行度的原因,你可以先将并行度设置为1试试。




Best regards,
haishui





在 2024-01-03 12:24:20,"ha.fen...@aisino.com" <ha.fen...@aisino.com> 写道:
>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的
>public static void main(String[] args) throws Exception {
>        StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>        SingleOutputStreamOperator<UserItem> 
> userItemSingleOutputStreamOperator = env
>                .socketTextStream("172.24.6.109", 9999)
>                .flatMap(new Splitter());
>
>        WatermarkStrategy<UserItem> watermarkStrategy = WatermarkStrategy
>                .<UserItem>forMonotonousTimestamps()
>                .withTimestampAssigner(new 
> SerializableTimestampAssigner<UserItem>() {
>                    @Override
>                    public long extractTimestamp(UserItem element, long 
> recordTimestamp) {
>                        System.out.println(element.timestamp);
>                        return element.timestamp;
>                    }
>                });
>        SingleOutputStreamOperator<UserItem> watermark = 
> userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy);
>        watermark.keyBy(item -> item.itemid)
>               // 
> .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)))
>                .window(TumblingEventTimeWindows.of(Time.minutes(5)))
>                .aggregate(new AggregateFunctionMethod(), new MyProcess())
>                .print();
>
>        env.execute();
>    }
>    public static class Splitter implements FlatMapFunction<String, UserItem> {
>        @Override
>        public void flatMap(String sentence, Collector<UserItem> out) throws 
> Exception {
>            String[] words = sentence.split(",");
>                out.collect(new 
> UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2])));
>            }
>        }
>
>    public static class AggregateFunctionMethod implements 
> AggregateFunction<UserItem, Integer, Integer> {
>        @Override
>        public Integer createAccumulator() {
>            return 0;
>        }
>
>        @Override
>        public Integer add(UserItem item, Integer i) {
>            return i+1;
>        }
>
>        @Override
>        public Integer getResult(Integer i) {
>            return i;
>        }
>
>        @Override
>        public Integer merge(Integer i, Integer acc1) {
>            return i+acc1;
>        }
>    }
>
>    public static class MyProcess extends 
> ProcessWindowFunction<Integer,String,Integer, TimeWindow> {
>
>        @Override
>        public void process(Integer s, Context context, Iterable<Integer> 
> elements, Collector<String> out) throws Exception {
>            long startTs = context.window().getStart();
>            long endTs = context.window().getEnd();
>            String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd 
> HH:mm:ss.SSS");
>            String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd 
> HH:mm:ss.SSS");
>            StringBuilder sb = new StringBuilder();
>            sb.append("窗口["+windowStart+","+windowEnd);
>            out.collect(sb.toString());
>        }
>    }
> 
>发件人: Xuyang
>发送时间: 2024-01-03 09:36
>收件人: user-zh
>主题: Re:Re: 滑动窗口按照处理时间触发的问题
>Hi,
>基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。
> 
> 
> 
> 
>--
> 
>    Best!
>    Xuyang
> 
> 
> 
> 
> 
>在 2024-01-02 23:31:54,"Jinsui Chen" <ischenjin...@gmail.com> 写道:
>>Hi,
>>
>>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>>
>>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>>00:20 - 01:20 这个时间窗口上。
>>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>>的数据,而现实是没有这样的一条数据。
>>
>>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>>
>>Best regards,
>>Jinsui
>>
>>ha.fen...@aisino.com <ha.fen...@aisino.com> 于2024年1月2日周二 20:17写道:
>>
>>>
>>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
>>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
>>> 我使用socket方式进行录入数据
>>> 2024-01-02 01:19:01  1704129541000
>>> 2024-01-02 01:21:01  1704129661000
>>> 2024-01-02 01:26:01  1704129961000
>>> 2024-01-02 01:29:01  1704130141000
>>> 2024-01-02 01:34:01  1704130441000
>>> 前面是对应的时间,后面是我录入系统的时间
>>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
>>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
>>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>>>

回复