Re:回复: Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 haishui
比如并行度是4,任务执行图是: Source(p=1) ==reblance=> flatMap和Timestamp/watermrk(p=4)  
=hash=> window(p=4)
window的水位线取上游四个算子水位线的最小值, 你需要写4个数据,才能让四个子任务水位线更新,window的水位线才有一次更新
Best regards,
haishui





在 2024-01-03 14:25:48,"ha.fen...@aisino.com"  写道:
>设置并行度1确实可以了。env.setParallelism(1);
>这里按照key分组,我输入的key都是同一个值,应该在同一个分区。
>watermark.keyBy(item -> item.itemid)
>我理解的就是处理环节其实就是单并行度的,没有问题。问题出在socket这种方式必须使用单并行度?
>发件人: haishui
>发送时间: 2024-01-03 13:35
>收件人: user-zh
>主题: Re:回复: Re: 滑动窗口按照处理时间触发的问题
>Hi,
> 
> 
>应该是并行度的原因,你可以先将并行度设置为1试试。
> 
> 
> 
> 
>Best regards,
>haishui
> 
> 
> 
> 
> 
>在 2024-01-03 12:24:20,"ha.fen...@aisino.com"  写道:
>>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的
>>public static void main(String[] args) throws Exception {
>>StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>SingleOutputStreamOperator 
>> userItemSingleOutputStreamOperator = env
>>.socketTextStream("172.24.6.109", )
>>.flatMap(new Splitter());
>>
>>WatermarkStrategy watermarkStrategy = WatermarkStrategy
>>.forMonotonousTimestamps()
>>.withTimestampAssigner(new 
>> SerializableTimestampAssigner() {
>>@Override
>>public long extractTimestamp(UserItem element, long 
>> recordTimestamp) {
>>System.out.println(element.timestamp);
>>return element.timestamp;
>>}
>>});
>>SingleOutputStreamOperator watermark = 
>> userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy);
>>watermark.keyBy(item -> item.itemid)
>>   // 
>> .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)))
>>.window(TumblingEventTimeWindows.of(Time.minutes(5)))
>>.aggregate(new AggregateFunctionMethod(), new MyProcess())
>>.print();
>>
>>env.execute();
>>}
>>public static class Splitter implements FlatMapFunction 
>> {
>>@Override
>>public void flatMap(String sentence, Collector out) throws 
>> Exception {
>>String[] words = sentence.split(",");
>>out.collect(new 
>> UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2])));
>>}
>>}
>>
>>public static class AggregateFunctionMethod implements 
>> AggregateFunction {
>>@Override
>>public Integer createAccumulator() {
>>return 0;
>>}
>>
>>@Override
>>public Integer add(UserItem item, Integer i) {
>>return i+1;
>>}
>>
>>@Override
>>public Integer getResult(Integer i) {
>>return i;
>>}
>>
>>@Override
>>public Integer merge(Integer i, Integer acc1) {
>>return i+acc1;
>>}
>>}
>>
>>public static class MyProcess extends 
>> ProcessWindowFunction {
>>
>>@Override
>>public void process(Integer s, Context context, Iterable 
>> elements, Collector out) throws Exception {
>>long startTs = context.window().getStart();
>>long endTs = context.window().getEnd();
>>String windowStart = DateFormatUtils.format(startTs, "-MM-dd 
>> HH:mm:ss.SSS");
>>String windowEnd = DateFormatUtils.format(endTs, "-MM-dd 
>> HH:mm:ss.SSS");
>>StringBuilder sb = new StringBuilder();
>>sb.append("窗口["+windowStart+","+windowEnd);
>>out.collect(sb.toString());
>>}
>>}
>> 
>>发件人: Xuyang
>>发送时间: 2024-01-03 09:36
>>收件人: user-zh
>>主题: Re:Re: 滑动窗口按照处理时间触发的问题
>>Hi,
>>基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。
>> 
>> 
>> 
>> 
>>--
>> 
>>Best!
>>Xuyang
>> 
>> 
>> 
>> 
>> 
>>在 2024-01-02 23:31:54,"Jinsui Chen"  写道:
>>>Hi,
>>>
>>>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>>>
>>>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>>>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>>>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>>>00:20 - 01:20 这个时间窗口上。
>>>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>>>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>>>的数据,而现实是没有这样的一条数据。
>>>
>>>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>>>
>>>Best regards,
>>>Jinsui
>>>
>>>ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:
>>>

 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
 我使用socket方式进行录入数据
 2024-01-02 01:19:01  1704129541000
 2024-01-02 01:21:01  1704129661000
 2024-01-02 01:26:01  1704129961000
 2024-01-02 01:29:01  1704130141000
 2024-01-02 01:34:01  1704130441000
 前面是对应的时间,后面是我录入系统的时间
 MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
 2024-01-02 00

Re:回复: Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 haishui
Hi,


应该是并行度的原因,你可以先将并行度设置为1试试。




Best regards,
haishui





在 2024-01-03 12:24:20,"ha.fen...@aisino.com"  写道:
>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的
>public static void main(String[] args) throws Exception {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>SingleOutputStreamOperator 
> userItemSingleOutputStreamOperator = env
>.socketTextStream("172.24.6.109", )
>.flatMap(new Splitter());
>
>WatermarkStrategy watermarkStrategy = WatermarkStrategy
>.forMonotonousTimestamps()
>.withTimestampAssigner(new 
> SerializableTimestampAssigner() {
>@Override
>public long extractTimestamp(UserItem element, long 
> recordTimestamp) {
>System.out.println(element.timestamp);
>return element.timestamp;
>}
>});
>SingleOutputStreamOperator watermark = 
> userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy);
>watermark.keyBy(item -> item.itemid)
>   // 
> .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)))
>.window(TumblingEventTimeWindows.of(Time.minutes(5)))
>.aggregate(new AggregateFunctionMethod(), new MyProcess())
>.print();
>
>env.execute();
>}
>public static class Splitter implements FlatMapFunction {
>@Override
>public void flatMap(String sentence, Collector out) throws 
> Exception {
>String[] words = sentence.split(",");
>out.collect(new 
> UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2])));
>}
>}
>
>public static class AggregateFunctionMethod implements 
> AggregateFunction {
>@Override
>public Integer createAccumulator() {
>return 0;
>}
>
>@Override
>public Integer add(UserItem item, Integer i) {
>return i+1;
>}
>
>@Override
>public Integer getResult(Integer i) {
>return i;
>}
>
>@Override
>public Integer merge(Integer i, Integer acc1) {
>return i+acc1;
>}
>}
>
>public static class MyProcess extends 
> ProcessWindowFunction {
>
>@Override
>public void process(Integer s, Context context, Iterable 
> elements, Collector out) throws Exception {
>long startTs = context.window().getStart();
>long endTs = context.window().getEnd();
>String windowStart = DateFormatUtils.format(startTs, "-MM-dd 
> HH:mm:ss.SSS");
>String windowEnd = DateFormatUtils.format(endTs, "-MM-dd 
> HH:mm:ss.SSS");
>StringBuilder sb = new StringBuilder();
>sb.append("窗口["+windowStart+","+windowEnd);
>out.collect(sb.toString());
>}
>}
> 
>发件人: Xuyang
>发送时间: 2024-01-03 09:36
>收件人: user-zh
>主题: Re:Re: 滑动窗口按照处理时间触发的问题
>Hi,
>基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。
> 
> 
> 
> 
>--
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
>在 2024-01-02 23:31:54,"Jinsui Chen"  写道:
>>Hi,
>>
>>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>>
>>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>>00:20 - 01:20 这个时间窗口上。
>>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>>的数据,而现实是没有这样的一条数据。
>>
>>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>>
>>Best regards,
>>Jinsui
>>
>>ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:
>>
>>>
>>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
>>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
>>> 我使用socket方式进行录入数据
>>> 2024-01-02 01:19:01  1704129541000
>>> 2024-01-02 01:21:01  1704129661000
>>> 2024-01-02 01:26:01  1704129961000
>>> 2024-01-02 01:29:01  1704130141000
>>> 2024-01-02 01:34:01  1704130441000
>>> 前面是对应的时间,后面是我录入系统的时间
>>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
>>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
>>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>>>


Re:Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 Xuyang
Hi,
基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。




--

Best!
Xuyang





在 2024-01-02 23:31:54,"Jinsui Chen"  写道:
>Hi,
>
>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>
>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>00:20 - 01:20 这个时间窗口上。
>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>的数据,而现实是没有这样的一条数据。
>
>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>
>Best regards,
>Jinsui
>
>ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:
>
>>
>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
>> 我使用socket方式进行录入数据
>> 2024-01-02 01:19:01  1704129541000
>> 2024-01-02 01:21:01  1704129661000
>> 2024-01-02 01:26:01  1704129961000
>> 2024-01-02 01:29:01  1704130141000
>> 2024-01-02 01:34:01  1704130441000
>> 前面是对应的时间,后面是我录入系统的时间
>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>>


Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 Jinsui Chen
Hi,

请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。

假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
00:20 - 01:20 这个时间窗口上。
2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
的数据,而现实是没有这样的一条数据。

我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。

Best regards,
Jinsui

ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:

>
> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
> 我使用socket方式进行录入数据
> 2024-01-02 01:19:01  1704129541000
> 2024-01-02 01:21:01  1704129661000
> 2024-01-02 01:26:01  1704129961000
> 2024-01-02 01:29:01  1704130141000
> 2024-01-02 01:34:01  1704130441000
> 前面是对应的时间,后面是我录入系统的时间
> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>


RE: 如何在IDE里面调试flink cdc 3.0作业?

2024-01-02 文章 Jiabao Sun
Hi,

可以参考下这篇文档[1],进行简单的测试。

Best,
Jiabao

[1] 
https://docs.google.com/document/d/1L6cJiqYkAsZ_nDa3MgRwV3SKQuw5OrMbqGC4YgzgKR4/edit#heading=h.aybxdd96r62i


On 2024/01/02 08:02:10 "casel.chen" wrote:
> 我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 
> flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?

如何在IDE里面调试flink cdc 3.0作业?

2024-01-02 文章 casel.chen
我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 
flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?