Re:回复: Re: 滑动窗口按照处理时间触发的问题
比如并行度是4,任务执行图是: Source(p=1) ==reblance=> flatMap和Timestamp/watermrk(p=4) =hash=> window(p=4) window的水位线取上游四个算子水位线的最小值, 你需要写4个数据,才能让四个子任务水位线更新,window的水位线才有一次更新 Best regards, haishui 在 2024-01-03 14:25:48,"ha.fen...@aisino.com" 写道: >设置并行度1确实可以了。env.setParallelism(1); >这里按照key分组,我输入的key都是同一个值,应该在同一个分区。 >watermark.keyBy(item -> item.itemid) >我理解的就是处理环节其实就是单并行度的,没有问题。问题出在socket这种方式必须使用单并行度? >发件人: haishui >发送时间: 2024-01-03 13:35 >收件人: user-zh >主题: Re:回复: Re: 滑动窗口按照处理时间触发的问题 >Hi, > > >应该是并行度的原因,你可以先将并行度设置为1试试。 > > > > >Best regards, >haishui > > > > > >在 2024-01-03 12:24:20,"ha.fen...@aisino.com" 写道: >>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的 >>public static void main(String[] args) throws Exception { >>StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >>SingleOutputStreamOperator >> userItemSingleOutputStreamOperator = env >>.socketTextStream("172.24.6.109", ) >>.flatMap(new Splitter()); >> >>WatermarkStrategy watermarkStrategy = WatermarkStrategy >>.forMonotonousTimestamps() >>.withTimestampAssigner(new >> SerializableTimestampAssigner() { >>@Override >>public long extractTimestamp(UserItem element, long >> recordTimestamp) { >>System.out.println(element.timestamp); >>return element.timestamp; >>} >>}); >>SingleOutputStreamOperator watermark = >> userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy); >>watermark.keyBy(item -> item.itemid) >> // >> .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5))) >>.window(TumblingEventTimeWindows.of(Time.minutes(5))) >>.aggregate(new AggregateFunctionMethod(), new MyProcess()) >>.print(); >> >>env.execute(); >>} >>public static class Splitter implements FlatMapFunction >> { >>@Override >>public void flatMap(String sentence, Collector out) throws >> Exception { >>String[] words = sentence.split(","); >>out.collect(new >> UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2]))); >>} >>} >> >>public static class AggregateFunctionMethod implements >> AggregateFunction { >>@Override >>public Integer createAccumulator() { >>return 0; >>} >> >>@Override >>public Integer add(UserItem item, Integer i) { >>return i+1; >>} >> >>@Override >>public Integer getResult(Integer i) { >>return i; >>} >> >>@Override >>public Integer merge(Integer i, Integer acc1) { >>return i+acc1; >>} >>} >> >>public static class MyProcess extends >> ProcessWindowFunction { >> >>@Override >>public void process(Integer s, Context context, Iterable >> elements, Collector out) throws Exception { >>long startTs = context.window().getStart(); >>long endTs = context.window().getEnd(); >>String windowStart = DateFormatUtils.format(startTs, "-MM-dd >> HH:mm:ss.SSS"); >>String windowEnd = DateFormatUtils.format(endTs, "-MM-dd >> HH:mm:ss.SSS"); >>StringBuilder sb = new StringBuilder(); >>sb.append("窗口["+windowStart+","+windowEnd); >>out.collect(sb.toString()); >>} >>} >> >>发件人: Xuyang >>发送时间: 2024-01-03 09:36 >>收件人: user-zh >>主题: Re:Re: 滑动窗口按照处理时间触发的问题 >>Hi, >>基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。 >> >> >> >> >>-- >> >>Best! >>Xuyang >> >> >> >> >> >>在 2024-01-02 23:31:54,"Jinsui Chen" 写道: >>>Hi, >>> >>>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。 >>> >>>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下: >>>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00 >>>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在 >>>00:20 - 01:20 这个时间窗口上。 >>>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 - >>>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35 >>>的数据,而现实是没有这样的一条数据。 >>> >>>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。 >>> >>>Best regards, >>>Jinsui >>> >>>ha.fen...@aisino.com 于2024年1月2日周二 20:17写道: >>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)), 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。 我使用socket方式进行录入数据 2024-01-02 01:19:01 1704129541000 2024-01-02 01:21:01 1704129661000 2024-01-02 01:26:01 1704129961000 2024-01-02 01:29:01 1704130141000 2024-01-02 01:34:01 1704130441000 前面是对应的时间,后面是我录入系统的时间 MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为 2024-01-02 00
Re:回复: Re: 滑动窗口按照处理时间触发的问题
Hi, 应该是并行度的原因,你可以先将并行度设置为1试试。 Best regards, haishui 在 2024-01-03 12:24:20,"ha.fen...@aisino.com" 写道: >帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的 >public static void main(String[] args) throws Exception { >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > >SingleOutputStreamOperator > userItemSingleOutputStreamOperator = env >.socketTextStream("172.24.6.109", ) >.flatMap(new Splitter()); > >WatermarkStrategy watermarkStrategy = WatermarkStrategy >.forMonotonousTimestamps() >.withTimestampAssigner(new > SerializableTimestampAssigner() { >@Override >public long extractTimestamp(UserItem element, long > recordTimestamp) { >System.out.println(element.timestamp); >return element.timestamp; >} >}); >SingleOutputStreamOperator watermark = > userItemSingleOutputStreamOperator.assignTimestampsAndWatermarks(watermarkStrategy); >watermark.keyBy(item -> item.itemid) > // > .window(SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5))) >.window(TumblingEventTimeWindows.of(Time.minutes(5))) >.aggregate(new AggregateFunctionMethod(), new MyProcess()) >.print(); > >env.execute(); >} >public static class Splitter implements FlatMapFunction { >@Override >public void flatMap(String sentence, Collector out) throws > Exception { >String[] words = sentence.split(","); >out.collect(new > UserItem(Integer.parseInt(words[0]),Integer.parseInt(words[1]),Long.parseLong(words[2]))); >} >} > >public static class AggregateFunctionMethod implements > AggregateFunction { >@Override >public Integer createAccumulator() { >return 0; >} > >@Override >public Integer add(UserItem item, Integer i) { >return i+1; >} > >@Override >public Integer getResult(Integer i) { >return i; >} > >@Override >public Integer merge(Integer i, Integer acc1) { >return i+acc1; >} >} > >public static class MyProcess extends > ProcessWindowFunction { > >@Override >public void process(Integer s, Context context, Iterable > elements, Collector out) throws Exception { >long startTs = context.window().getStart(); >long endTs = context.window().getEnd(); >String windowStart = DateFormatUtils.format(startTs, "-MM-dd > HH:mm:ss.SSS"); >String windowEnd = DateFormatUtils.format(endTs, "-MM-dd > HH:mm:ss.SSS"); >StringBuilder sb = new StringBuilder(); >sb.append("窗口["+windowStart+","+windowEnd); >out.collect(sb.toString()); >} >} > >发件人: Xuyang >发送时间: 2024-01-03 09:36 >收件人: user-zh >主题: Re:Re: 滑动窗口按照处理时间触发的问题 >Hi, >基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。 > > > > >-- > >Best! >Xuyang > > > > > >在 2024-01-02 23:31:54,"Jinsui Chen" 写道: >>Hi, >> >>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。 >> >>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下: >>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00 >>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在 >>00:20 - 01:20 这个时间窗口上。 >>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 - >>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35 >>的数据,而现实是没有这样的一条数据。 >> >>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。 >> >>Best regards, >>Jinsui >> >>ha.fen...@aisino.com 于2024年1月2日周二 20:17写道: >> >>> >>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)), >>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。 >>> 我使用socket方式进行录入数据 >>> 2024-01-02 01:19:01 1704129541000 >>> 2024-01-02 01:21:01 1704129661000 >>> 2024-01-02 01:26:01 1704129961000 >>> 2024-01-02 01:29:01 1704130141000 >>> 2024-01-02 01:34:01 1704130441000 >>> 前面是对应的时间,后面是我录入系统的时间 >>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为 >>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000 >>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事? >>>
Re:Re: 滑动窗口按照处理时间触发的问题
Hi, 基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。 -- Best! Xuyang 在 2024-01-02 23:31:54,"Jinsui Chen" 写道: >Hi, > >请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。 > >假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下: >1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00 >UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在 >00:20 - 01:20 这个时间窗口上。 >2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 - >01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35 >的数据,而现实是没有这样的一条数据。 > >我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。 > >Best regards, >Jinsui > >ha.fen...@aisino.com 于2024年1月2日周二 20:17写道: > >> >> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)), >> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。 >> 我使用socket方式进行录入数据 >> 2024-01-02 01:19:01 1704129541000 >> 2024-01-02 01:21:01 1704129661000 >> 2024-01-02 01:26:01 1704129961000 >> 2024-01-02 01:29:01 1704130141000 >> 2024-01-02 01:34:01 1704130441000 >> 前面是对应的时间,后面是我录入系统的时间 >> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为 >> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000 >> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事? >>
Re: 滑动窗口按照处理时间触发的问题
Hi, 请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。 假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下: 1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00 UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在 00:20 - 01:20 这个时间窗口上。 2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 - 01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35 的数据,而现实是没有这样的一条数据。 我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。 Best regards, Jinsui ha.fen...@aisino.com 于2024年1月2日周二 20:17写道: > > 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)), > 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。 > 我使用socket方式进行录入数据 > 2024-01-02 01:19:01 1704129541000 > 2024-01-02 01:21:01 1704129661000 > 2024-01-02 01:26:01 1704129961000 > 2024-01-02 01:29:01 1704130141000 > 2024-01-02 01:34:01 1704130441000 > 前面是对应的时间,后面是我录入系统的时间 > MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为 > 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000 > 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事? >
RE: 如何在IDE里面调试flink cdc 3.0作业?
Hi, 可以参考下这篇文档[1],进行简单的测试。 Best, Jiabao [1] https://docs.google.com/document/d/1L6cJiqYkAsZ_nDa3MgRwV3SKQuw5OrMbqGC4YgzgKR4/edit#heading=h.aybxdd96r62i On 2024/01/02 08:02:10 "casel.chen" wrote: > 我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 > flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?
如何在IDE里面调试flink cdc 3.0作业?
我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?