Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 Thread yidan zhao
同时,貌似基于 ContinuousEventTimeTrigger 也是不行的。
对于key1的window1来说,第一个element进入时,注册的 window.maxTimestamp 肯定是10小时后才触发。
基于 ContinuousEventTimeTrigger 注册的 timestamp - (timestamp % interval)
也差不多是10小时后才触发。


现在来看,要么我再覆盖实现为原逻辑。  要么我就重新实现个基于 eventTime 的
ContinuousEventTimeTrigger,让持续注册的timestamp不再基于 element 的
timestamp,而是基于 ctx.getCurrentWatermark。相当于我用watermark当做processtime用,实现一个
continuousWatermarkTrigger。这样应该可以让代码逻辑恢复正常,就是无法解决我上一个内容提到的问题,长时间延迟数据迅速恢复时导致的大量et
trigger的触发。

yidan zhao  于2022年6月28日周二 12:40写道:
>
> 仔细回忆了下最初为啥要改造实现组合 eventTimeTrigger 和
> continuousProcessTimeTrigger。还是因为我的watermark是latestTs-10小时,我是考虑如果出现了数据延迟10小时左右的情况,那么我恢复的时候,数据补充进来,如果使用
> continuousEventTimeTrigger,10个小时的数据快速涌入,会快速导致很多此event time
> trigger。相当于单key单window,很快触发 10小时/10s =3600次,全key全window肯定会爆炸。
> 考虑到这种情况下,我期望基于processTime进行continuous trigger,所以做了这种改造。
>
> yidan zhao  于2022年6月28日周二 12:25写道:
> >
> > 刚刚贴完代码,就分析除问题来了,如下。
> > 我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个
> > long nextFireTimestamp = Math.min(time + interval,
> > window.maxTimestamp()); 取min的逻辑。
> > 我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。
> > 同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger 
> > 这种方式。
> >
> > 我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。
> >
> >
> >
> > 不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而
> > window.maxTimetamp() 在我的场景下是 et。
> > 找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。
> >
> > yidan zhao  于2022年6月28日周二 11:48写道:
> > >
> > > 对比实验了下,就是自定义的 trigger
> > > 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
> > >
> > > public class ContinuousProcessTimeTriggerForEventTimeWindow
> > > extends Trigger {
> > >
> > > private final EventTimeTrigger eventTimeTrigger;
> > >
> > > private final ContinuousProcessingTimeTrigger
> > > continuousProcessTimeTrigger;
> > >
> > > public static ContinuousProcessTimeTriggerForEventTimeWindow
> > > of(long windowUpdateTimeInSeconds) {
> > > return new
> > > ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
> > > }
> > >
> > > private ContinuousProcessTimeTriggerForEventTimeWindow(long
> > > windowUpdateTimeInSeconds) {
> > > eventTimeTrigger = EventTimeTrigger.create();
> > > continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
> > > Time.seconds(windowUpdateTimeInSeconds)
> > > );
> > > }
> > >
> > > @Override
> > > public TriggerResult onElement(
> > > Object element, long timestamp, TimeWindow window,
> > > TriggerContext ctx
> > > ) throws Exception {
> > > continuousProcessTimeTrigger.onElement(element, timestamp, 
> > > window, ctx);
> > > return eventTimeTrigger.onElement(element, timestamp, window, 
> > > ctx);
> > > }
> > >
> > > @Override
> > > public TriggerResult onEventTime(
> > > long time, TimeWindow window, TriggerContext ctx
> > > ) throws Exception {
> > > return eventTimeTrigger.onEventTime(time, window, ctx);
> > > }
> > >
> > > @Override
> > > public TriggerResult onProcessingTime(
> > > long time, TimeWindow window, TriggerContext ctx
> > > ) throws Exception {
> > > return continuousProcessTimeTrigger.onProcessingTime(time, 
> > > window, ctx);
> > > }
> > >
> > > @Override
> > > public void clear(
> > > TimeWindow window, TriggerContext ctx
> > > ) throws Exception {
> > > eventTimeTrigger.clear(window, ctx);
> > > continuousProcessTimeTrigger.clear(window, ctx);
> > > }
> > >
> > > @Override
> > > public boolean canMerge() {
> > > return true;
> > > }
> > >
> > > @Override
> > > public void onMerge(
> > > TimeWindow window, OnMergeContext ctx
> > > ) throws Exception {
> > > eventTimeTrigger.onMerge(window, ctx);
> > > continuousProcessTimeTrigger.onMerge(window, ctx);
> > > }
> > > }
> > >
> > > Shengkai Fang  于2022年6月28日周二 10:51写道:
> > > >
> > > > Hi.
> > > >
> > > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
> > > >
> > > > Best,
> > > > Shengkai
> > > >
> > > > yidan zhao  于2022年6月28日周二 10:44写道:
> > > >
> > > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> > > > >
> > > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > > > > window是event time window,配合自定义的
> > > > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > > > > trigger,但是统计窗口是et window)。
> > > > >
> > > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> > > > >


Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 Thread yidan zhao
仔细回忆了下最初为啥要改造实现组合 eventTimeTrigger 和
continuousProcessTimeTrigger。还是因为我的watermark是latestTs-10小时,我是考虑如果出现了数据延迟10小时左右的情况,那么我恢复的时候,数据补充进来,如果使用
continuousEventTimeTrigger,10个小时的数据快速涌入,会快速导致很多此event time
trigger。相当于单key单window,很快触发 10小时/10s =3600次,全key全window肯定会爆炸。
考虑到这种情况下,我期望基于processTime进行continuous trigger,所以做了这种改造。

yidan zhao  于2022年6月28日周二 12:25写道:
>
> 刚刚贴完代码,就分析除问题来了,如下。
> 我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个
> long nextFireTimestamp = Math.min(time + interval,
> window.maxTimestamp()); 取min的逻辑。
> 我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。
> 同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger 
> 这种方式。
>
> 我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。
>
>
>
> 不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而
> window.maxTimetamp() 在我的场景下是 et。
> 找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。
>
> yidan zhao  于2022年6月28日周二 11:48写道:
> >
> > 对比实验了下,就是自定义的 trigger
> > 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
> >
> > public class ContinuousProcessTimeTriggerForEventTimeWindow
> > extends Trigger {
> >
> > private final EventTimeTrigger eventTimeTrigger;
> >
> > private final ContinuousProcessingTimeTrigger
> > continuousProcessTimeTrigger;
> >
> > public static ContinuousProcessTimeTriggerForEventTimeWindow
> > of(long windowUpdateTimeInSeconds) {
> > return new
> > ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
> > }
> >
> > private ContinuousProcessTimeTriggerForEventTimeWindow(long
> > windowUpdateTimeInSeconds) {
> > eventTimeTrigger = EventTimeTrigger.create();
> > continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
> > Time.seconds(windowUpdateTimeInSeconds)
> > );
> > }
> >
> > @Override
> > public TriggerResult onElement(
> > Object element, long timestamp, TimeWindow window,
> > TriggerContext ctx
> > ) throws Exception {
> > continuousProcessTimeTrigger.onElement(element, timestamp, window, 
> > ctx);
> > return eventTimeTrigger.onElement(element, timestamp, window, ctx);
> > }
> >
> > @Override
> > public TriggerResult onEventTime(
> > long time, TimeWindow window, TriggerContext ctx
> > ) throws Exception {
> > return eventTimeTrigger.onEventTime(time, window, ctx);
> > }
> >
> > @Override
> > public TriggerResult onProcessingTime(
> > long time, TimeWindow window, TriggerContext ctx
> > ) throws Exception {
> > return continuousProcessTimeTrigger.onProcessingTime(time, window, 
> > ctx);
> > }
> >
> > @Override
> > public void clear(
> > TimeWindow window, TriggerContext ctx
> > ) throws Exception {
> > eventTimeTrigger.clear(window, ctx);
> > continuousProcessTimeTrigger.clear(window, ctx);
> > }
> >
> > @Override
> > public boolean canMerge() {
> > return true;
> > }
> >
> > @Override
> > public void onMerge(
> > TimeWindow window, OnMergeContext ctx
> > ) throws Exception {
> > eventTimeTrigger.onMerge(window, ctx);
> > continuousProcessTimeTrigger.onMerge(window, ctx);
> > }
> > }
> >
> > Shengkai Fang  于2022年6月28日周二 10:51写道:
> > >
> > > Hi.
> > >
> > > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
> > >
> > > Best,
> > > Shengkai
> > >
> > > yidan zhao  于2022年6月28日周二 10:44写道:
> > >
> > > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> > > >
> > > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > > > window是event time window,配合自定义的
> > > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > > > trigger,但是统计窗口是et window)。
> > > >
> > > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> > > >


Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 Thread yidan zhao
刚刚贴完代码,就分析除问题来了,如下。
我看了下,变化主要是 ContinuousProcessingTimeTrigger 中的注册 trigger 时的时间逻辑,加了个
long nextFireTimestamp = Math.min(time + interval,
window.maxTimestamp()); 取min的逻辑。
我这个任务的watermark是latestTs-10小时,因为这个任务特别,压力不大,但对数据完整性要求高,综合考虑这么做的。
同时,为了避免10小时后才输出结果,而且需求上,窗口闭合前就需要输出结果,每10s一次,因此采用 continuousProcessTrigger 这种方式。

我分析,对于同一个key下的窗口1,窗口闭合时trigger触发,然后触发下一次,然后会在同一个time继续registerTime,然后立即触发,死循环,这个过程对于每个key的每个窗口都需要持续10小时,因为窗口需要10小时后才会闭合。



不清楚加这么个逻辑的目的是什么呢? 对于ContinuousProcessingTimeTrigger来说,是基于pt进行触发的,而
window.maxTimetamp() 在我的场景下是 et。
找了下https://issues.apache.org/jira/browse/FLINK-20443对到这个jira,没看懂最终讨论了个啥结论,这貌似也不像是啥bug,为啥需要这么改呢。

yidan zhao  于2022年6月28日周二 11:48写道:
>
> 对比实验了下,就是自定义的 trigger
> 问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:
>
> public class ContinuousProcessTimeTriggerForEventTimeWindow
> extends Trigger {
>
> private final EventTimeTrigger eventTimeTrigger;
>
> private final ContinuousProcessingTimeTrigger
> continuousProcessTimeTrigger;
>
> public static ContinuousProcessTimeTriggerForEventTimeWindow
> of(long windowUpdateTimeInSeconds) {
> return new
> ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
> }
>
> private ContinuousProcessTimeTriggerForEventTimeWindow(long
> windowUpdateTimeInSeconds) {
> eventTimeTrigger = EventTimeTrigger.create();
> continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
> Time.seconds(windowUpdateTimeInSeconds)
> );
> }
>
> @Override
> public TriggerResult onElement(
> Object element, long timestamp, TimeWindow window,
> TriggerContext ctx
> ) throws Exception {
> continuousProcessTimeTrigger.onElement(element, timestamp, window, 
> ctx);
> return eventTimeTrigger.onElement(element, timestamp, window, ctx);
> }
>
> @Override
> public TriggerResult onEventTime(
> long time, TimeWindow window, TriggerContext ctx
> ) throws Exception {
> return eventTimeTrigger.onEventTime(time, window, ctx);
> }
>
> @Override
> public TriggerResult onProcessingTime(
> long time, TimeWindow window, TriggerContext ctx
> ) throws Exception {
> return continuousProcessTimeTrigger.onProcessingTime(time, window, 
> ctx);
> }
>
> @Override
> public void clear(
> TimeWindow window, TriggerContext ctx
> ) throws Exception {
> eventTimeTrigger.clear(window, ctx);
> continuousProcessTimeTrigger.clear(window, ctx);
> }
>
> @Override
> public boolean canMerge() {
> return true;
> }
>
> @Override
> public void onMerge(
> TimeWindow window, OnMergeContext ctx
> ) throws Exception {
> eventTimeTrigger.onMerge(window, ctx);
> continuousProcessTimeTrigger.onMerge(window, ctx);
> }
> }
>
> Shengkai Fang  于2022年6月28日周二 10:51写道:
> >
> > Hi.
> >
> > 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
> >
> > Best,
> > Shengkai
> >
> > yidan zhao  于2022年6月28日周二 10:44写道:
> >
> > > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> > >
> > > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > > window是event time window,配合自定义的
> > > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > > trigger,但是统计窗口是et window)。
> > >
> > > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> > >


Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 Thread yidan zhao
对比实验了下,就是自定义的 trigger
问题,不清楚从1.13.2到1.13.6有啥不同。我的自定义trigger如下,内部就是组装了EventTimeTrigger和ContinuousProcessingTimeTrigger:

public class ContinuousProcessTimeTriggerForEventTimeWindow
extends Trigger {

private final EventTimeTrigger eventTimeTrigger;

private final ContinuousProcessingTimeTrigger
continuousProcessTimeTrigger;

public static ContinuousProcessTimeTriggerForEventTimeWindow
of(long windowUpdateTimeInSeconds) {
return new
ContinuousProcessTimeTriggerForEventTimeWindow(windowUpdateTimeInSeconds);
}

private ContinuousProcessTimeTriggerForEventTimeWindow(long
windowUpdateTimeInSeconds) {
eventTimeTrigger = EventTimeTrigger.create();
continuousProcessTimeTrigger = ContinuousProcessingTimeTrigger.of(
Time.seconds(windowUpdateTimeInSeconds)
);
}

@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window,
TriggerContext ctx
) throws Exception {
continuousProcessTimeTrigger.onElement(element, timestamp, window, ctx);
return eventTimeTrigger.onElement(element, timestamp, window, ctx);
}

@Override
public TriggerResult onEventTime(
long time, TimeWindow window, TriggerContext ctx
) throws Exception {
return eventTimeTrigger.onEventTime(time, window, ctx);
}

@Override
public TriggerResult onProcessingTime(
long time, TimeWindow window, TriggerContext ctx
) throws Exception {
return continuousProcessTimeTrigger.onProcessingTime(time, window, ctx);
}

@Override
public void clear(
TimeWindow window, TriggerContext ctx
) throws Exception {
eventTimeTrigger.clear(window, ctx);
continuousProcessTimeTrigger.clear(window, ctx);
}

@Override
public boolean canMerge() {
return true;
}

@Override
public void onMerge(
TimeWindow window, OnMergeContext ctx
) throws Exception {
eventTimeTrigger.onMerge(window, ctx);
continuousProcessTimeTrigger.onMerge(window, ctx);
}
}

Shengkai Fang  于2022年6月28日周二 10:51写道:
>
> Hi.
>
> 这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。
>
> Best,
> Shengkai
>
> yidan zhao  于2022年6月28日周二 10:44写道:
>
> > 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
> >
> > 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> > window是event time window,配合自定义的
> > continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> > trigger,但是统计窗口是et window)。
> >
> > 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
> >


Re: How to clean up RocksDB local directory in K8s statefulset

2022-06-27 Thread yanfei lei
Hi Allen, what volumes do you use for your TM pod? If you want your data to
be deleted when the pod restarts, you can use an ephemeral volume like
EmptyDir.
And Flink should remove temporary files automatically when they are not
needed anymore(see this discussion
).

Working directory only takes effects after Flink 1.15,  a local RocksDB
directory is usually located under /tmp directory in Flink 1.14,  if you
don't specifically configure state.backend.rocksdb.localdir
.
So, the working directory can't help.

Allen Wang  于2022年6月28日周二 04:39写道:

> Hi Folks,
>
> We created a stateful job using SessionWindow and RocksDB state backend
> and deployed it on Kubernetes Statefulset with persisted volumes. The Flink
> version we used is 1.14.
>
> After the job runs for some time, we observed that the size of the local
> RocksDB directory started to grow and there are more and more
> directories created inside it. It seems that when the job is restarted or
> the task manager K8s pod is restarted, the previous RocksDB directory
> corresponding to the assigned operator is not cleaned up. Here is an
> example:
>
> drwxr-xr-x 3 root root 4096 Jun 27 18:23
> job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__1_4__uuid_c97f3f3f-649a-467d-82af-2bc250ec6e22
> drwxr-xr-x 3 root root 4096 Jun 27 18:45
> job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__1_4__uuid_e4fca2c3-74c7-4aa2-9ca1-dda866b8de11
> drwxr-xr-x 3 root root 4096 Jun 27 18:56
> job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__2_4__uuid_f1fa-7402-494d-80d7-65861394710c
> drwxr-xr-x 3 root root 4096 Jun 27 17:34
> job__op_WindowOperator_f6dc7f4d2283f4605b127b9364e21148__3_4__uuid_08a14423-bea1-44ce-96ee-360a516d72a6
>
> Although only
> job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__2_4__uuid_f1fa-7402-494d-80d7-65861394710c
> is the active running operator, the other directories for the past
> operators still exist.
>
> We set up the task manager property taskmanager.resource-id to be the task
> manager pod name under the statefulset but it did not seem to help cleaning
> up previous directories.
>
> Any pointers to solve this issue?
>
> We checked the latest document and it seems that Flink 1.15 introduced the
> concept of local working directory:
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/working_directory/.
> Does that help cleaning up the RocksDB directory?
>
> Thanks,
> Allen
>
>
>
>
>


Re: flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 Thread Shengkai Fang
Hi.

这种情况下可以用 jprofile 看看到底 cpu 花在哪里。你可以使用火焰图或者 jstack 看看具体的栈和使用。

Best,
Shengkai

yidan zhao  于2022年6月28日周二 10:44写道:

> 目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。
>
> 目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
> window是event time window,配合自定义的
> continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
> trigger,但是统计窗口是et window)。
>
> 请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。
>


flink1.13.2 -> 1.13.6 任务cpu从50%左右飙升到1000%,window算子故障。

2022-06-27 Thread yidan zhao
目前现象如题。任务就是kafkaSource读取数据,简单过滤,然后window,然后输出到mysql。

目前来看运行后1-2min后cpu开始异常,不是马上异常。 异常时候window算子busy为100%。
window是event time window,配合自定义的
continuousProcessTriggerForEventTimeWindow(基于pt进行continuous
trigger,但是统计窗口是et window)。

请问这种怎么排查呢?目前来看应该是卡在某个地方了,cancel任务后,直接等到tm失败。window算子百分百不会cancel成功。


Re: Flink k8s 作业提交流程

2022-06-27 Thread Lijie Wang
Hi,

使用文档可以查看:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes
设计文档可以查看:
https://docs.google.com/document/d/1-jNzqGF6NfZuwVaFICoFQ5HFFXzF5NVIagUZByFMfBY/edit?usp=sharing
jira: https://issues.apache.org/jira/browse/FLINK-9953

Best,
Lijie

hjw <1010445...@qq.com.invalid> 于2022年6月28日周二 00:11写道:

> Flink version:1.15.0
> 请问在1.15.0版本Flink在native k8s作业提交流程是如何实现的?亦或者说Flink on Native k8s
> 是如何设计的,我想了解学习,如果大家有相关文档资料,麻烦告知,感谢:)
> 


Flink-Kubernetes-operator error.

2022-06-27 Thread Murphy, Matthew
Hello Folks,



Having an issue with the flink-kubernetes-operator.

We are trying to delete an existing deployment in a customer namespace.



The operator doesn't seem to pick up the deletion.



I re-cycled the operator pod and see this.

Seeing this error in operator logs:

Operator Version 1.0.0
Kubernetes v1.21.6

I re-cycled the operator pod and see this.

Seeing this error in operator logs:




2022-06-27 20:19:10,561 o.a.f.k.o.i.InformerManager[INFO ] Created session 
job informers for [allNamespace]

2022-06-27 20:19:10,668 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][supplychain-nwsc/nwsc-stream-processor-dev01] Deleting FlinkDeployment

2022-06-27 20:19:10,682 i.j.o.p.e.ReconciliationDispatcher 
[ERROR][supplychain-nwsc/nwsc-stream-processor-dev01] Error during event 
processing ExecutionScope{ resource id: 
CustomResourceID{name='nwsc-stream-processor-dev01', 
namespace='supplychain-nwsc'}, version: 53209979} failed.

java.lang.RuntimeException: Cannot create observe config before first 
deployment, this indicates a bug.

at 
org.apache.flink.kubernetes.operator.config.FlinkConfigManager.getObserveConfig(FlinkConfigManager.java:137)

at 
org.apache.flink.kubernetes.operator.service.FlinkService.cancelJob(FlinkService.java:357)

at 
org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.shutdown(ApplicationReconciler.java:327)

at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:56)

at 
org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:37)

at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:107)

at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:59)

at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:68)

at 
io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:50)

at 
io.javaoperatorsdk.operator.api.monitoring.Metrics.timeControllerExecution(Metrics.java:34)

at 
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:49)

at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleCleanup(ReconciliationDispatcher.java:252)

at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:72)

at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:50)

at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ControllerExecution.run(EventProcessor.java:349)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)

at java.base/java.lang.Thread.run(Unknown Source)



Re: How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

2022-06-27 Thread John Tipper
Hi Dian,

Thanks, much appreciated.

Kind regards,

John

Sent from my iPhone

On 27 Jun 2022, at 03:43, Dian Fu  wrote:


Hi John,

This seems like a bug and I have created a ticket 
https://issues.apache.org/jira/browse/FLINK-28253 to track it.

For now, you could try replacing to_data_stream with to_append_stream` to see 
if it works.

Regards,
Dian

On Sat, Jun 25, 2022 at 4:07 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi,

I have a source table using a Kinesis connector reading events from AWS 
EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in 
this stream is here: 
https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref.
 Note that the stream of data contains many different types of events, where 
the 'detail' field is completely different between different event types. There 
is no support for this connector using PyFlink DataStream API, so I use the 
Table API to construct the source table.  The table looks like this:


CREATE TABLE events (
 `id` VARCHAR,
 `source` VARCHAR,
 `account` VARCHAR,
 `region` VARCHAR,
 `detail-type` VARCHAR,
 `detail` VARCHAR,
 `source` VARCHAR,
 `resources` VARCHAR,
 `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
 WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
 PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
...
)


The table was created using:

 table_env.execute_sql(CREATE_STRING_ABOVE)

I'd like to turn this table into a data stream so I can perform some processing 
that is easier to do in the DataStream API:


events_stream_table = table_env.from_path('events')

events_stream = table_env.to_data_stream(events_stream_table)

# now do some processing - let's filter by the type of event we get

codebuild_stream = events_stream.filter(
lambda event: event['source'] == 'aws.codebuild'
)

# now do other stuff on a stream containing only events that are identical in 
shape
...
# maybe convert back into a Table and perform SQL on the data


When I run this, I get an exception:



org.apache.flink.table.api.TableException: Unsupported conversion from data type

 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to

type information. Only data types that originated from type information fully

support a reverse conversion.

Somebody reported a similar error here 
(https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception)
 When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME 
ZONE" with a "TIMESTAMP(3)" I get a different exception:

TypeError: The java type info: LocalDateTime is not supported in PyFlink 
currently.

Is there a way of converting this Table into a DataStream (and then back 
again)? I need to use the data in the "time"​ field as the source of watermarks 
for my events.

Many thanks,

John


How to clean up RocksDB local directory in K8s statefulset

2022-06-27 Thread Allen Wang
Hi Folks,

We created a stateful job using SessionWindow and RocksDB state backend and
deployed it on Kubernetes Statefulset with persisted volumes. The Flink
version we used is 1.14.

After the job runs for some time, we observed that the size of the local
RocksDB directory started to grow and there are more and more
directories created inside it. It seems that when the job is restarted or
the task manager K8s pod is restarted, the previous RocksDB directory
corresponding to the assigned operator is not cleaned up. Here is an
example:

drwxr-xr-x 3 root root 4096 Jun 27 18:23
job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__1_4__uuid_c97f3f3f-649a-467d-82af-2bc250ec6e22
drwxr-xr-x 3 root root 4096 Jun 27 18:45
job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__1_4__uuid_e4fca2c3-74c7-4aa2-9ca1-dda866b8de11
drwxr-xr-x 3 root root 4096 Jun 27 18:56
job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__2_4__uuid_f1fa-7402-494d-80d7-65861394710c
drwxr-xr-x 3 root root 4096 Jun 27 17:34
job__op_WindowOperator_f6dc7f4d2283f4605b127b9364e21148__3_4__uuid_08a14423-bea1-44ce-96ee-360a516d72a6

Although only
job__op_WindowOperator_2b0a50a068bb7f1c8a470e4f763cbf26__2_4__uuid_f1fa-7402-494d-80d7-65861394710c
is the active running operator, the other directories for the past
operators still exist.

We set up the task manager property taskmanager.resource-id to be the task
manager pod name under the statefulset but it did not seem to help cleaning
up previous directories.

Any pointers to solve this issue?

We checked the latest document and it seems that Flink 1.15 introduced the
concept of local working directory:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/working_directory/.
Does that help cleaning up the RocksDB directory?

Thanks,
Allen


Re: Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-06-27 Thread Andrew Otto
This sounds very useful!  Another potential use case:

- Consuming from multiple kafka clusters in different datacenters/regions.

I'm not sure if we would ultimately want to do this, but having it as an
option when you need events from multiple kafka clusters to get the full
history of changes (instead of relying on MirrorMaker) could be nice.






On Mon, Jun 27, 2022 at 1:02 PM Ryan van Huuksloot <
ryan.vanhuuksl...@shopify.com> wrote:

> Hi Mason,
>
> Thanks for starting this discussion! The proposed Source sounds awesome
> and we would be interested in taking a look at the source code and
> evaluating our use cases. We can provide information and review on a
> potential FLIP based on other use cases.
>
> Do you have a fork/branch that you are working with that is public? Could
> you attach that so we can start looking at it?
>
> Let us know if you need anything from us to help move this forward.
>
> Thanks!
> Ryan
>
> On 2022/06/27 03:08:13 Qingsheng Ren wrote:
> > Hi Mason,
> >
> > It sounds like an exciting enhancement to the Kafka source and will
> benefit a lot of users I believe.
> >
> > Would you prefer to reuse the existing flink-connector-kafka module or
> create a new one for the new multi-cluster feature? Personally I prefer the
> former one because users won’t need to introduce another dependency module
> to their projects in order to use the feature.
> >
> > Thanks for the effort on this and looking forward to your FLIP!
> >
> > Best,
> > Qingsheng
> >
> > > On Jun 24, 2022, at 09:43, Mason Chen  wrote:
> > >
> > > Hi community,
> > >
> > > We have been working on a Multi Cluster Kafka Source and are looking to
> > > contribute it upstream. I've given a talk about the features and
> design at
> > > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> > >
> > > The main features that it provides is:
> > > 1. Reading multiple Kafka clusters within a single source.
> > > 2. Adjusting the clusters and topics the source consumes from
> dynamically,
> > > without Flink job restart.
> > >
> > > Some of the challenging use cases that these features solve are:
> > > 1. Transparent Kafka cluster migration without Flink job restart.
> > > 2. Transparent Kafka topic migration without Flink job restart.
> > > 3. Direct integration with Hybrid Source.
> > >
> > > In addition, this is designed with wrapping and managing the existing
> > > KafkaSource components to enable these features, so it can continue to
> > > benefit from KafkaSource improvements and bug fixes. It can be
> considered
> > > as a form of a composite source.
> > >
> > > I think the contribution of this source could benefit a lot of users
> who
> > > have asked in the mailing list about Flink handling Kafka migrations
> and
> > > removing topics in the past. I would love to hear and address your
> thoughts
> > > and feedback, and if possible drive a FLIP!
> > >
> > > Best,
> > > Mason
> >
> >
>


RE: Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-06-27 Thread Ryan van Huuksloot
Hi Mason,

Thanks for starting this discussion! The proposed Source sounds awesome and
we would be interested in taking a look at the source code and evaluating
our use cases. We can provide information and review on a potential FLIP
based on other use cases.

Do you have a fork/branch that you are working with that is public? Could
you attach that so we can start looking at it?

Let us know if you need anything from us to help move this forward.

Thanks!
Ryan

On 2022/06/27 03:08:13 Qingsheng Ren wrote:
> Hi Mason,
>
> It sounds like an exciting enhancement to the Kafka source and will
benefit a lot of users I believe.
>
> Would you prefer to reuse the existing flink-connector-kafka module or
create a new one for the new multi-cluster feature? Personally I prefer the
former one because users won’t need to introduce another dependency module
to their projects in order to use the feature.
>
> Thanks for the effort on this and looking forward to your FLIP!
>
> Best,
> Qingsheng
>
> > On Jun 24, 2022, at 09:43, Mason Chen  wrote:
> >
> > Hi community,
> >
> > We have been working on a Multi Cluster Kafka Source and are looking to
> > contribute it upstream. I've given a talk about the features and design
at
> > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> >
> > The main features that it provides is:
> > 1. Reading multiple Kafka clusters within a single source.
> > 2. Adjusting the clusters and topics the source consumes from
dynamically,
> > without Flink job restart.
> >
> > Some of the challenging use cases that these features solve are:
> > 1. Transparent Kafka cluster migration without Flink job restart.
> > 2. Transparent Kafka topic migration without Flink job restart.
> > 3. Direct integration with Hybrid Source.
> >
> > In addition, this is designed with wrapping and managing the existing
> > KafkaSource components to enable these features, so it can continue to
> > benefit from KafkaSource improvements and bug fixes. It can be
considered
> > as a form of a composite source.
> >
> > I think the contribution of this source could benefit a lot of users who
> > have asked in the mailing list about Flink handling Kafka migrations and
> > removing topics in the past. I would love to hear and address your
thoughts
> > and feedback, and if possible drive a FLIP!
> >
> > Best,
> > Mason
>
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-27 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.0.1.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.


The release is available for download at:
https://flink.apache.org/downloads.html

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351812

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-27 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.0.1.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.


The release is available for download at:
https://flink.apache.org/downloads.html

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351812

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Flink k8s ????????????

2022-06-27 Thread hjw
Flink version:1.15.0
??1.15.0Flink??native k8s?Flink on 
Native k8s ??:)


Re: 来自潘明文的邮件

2022-06-27 Thread Weihua Hu
Hi,
图片看不到了,正常来说做个 Sink 算子之间是没有执行先后顺序保证的,是可以并行的。 但是如果多个 sink 被 operator chain
优化在一起,单个 operator chain 内部数据是并行的
Best,
Weihua


On Fri, Jun 24, 2022 at 9:29 PM Lincoln Lee  wrote:

> Hi,
>邮件中直接贴图片无法正常看到,可以发下文本
>
> Best,
> Lincoln Lee
>
>
> 潘明文  于2022年6月24日周五 16:36写道:
>
> > 你好,下面2个SINK 能够并发同时处理吗?还是要窜行,等第一个SINK 好了,才能第二个SINK.
> >
> >
>


Re: 任务 cancel 失败,个别 task 一直处于 CANCELING 状态

2022-06-27 Thread Weihua Hu
Hi,
Task 长时间 Cancel 失败(默认 180s)会触发 watchdog 导致 TaskManager 主动退出,并定时输出日志打印当前
Task 线程执行的 thread 信息(默认 30s 一次),可以检查下 TaskManager 的日志,找一下关键字

but is stuck in method:


Best,
Weihua


On Mon, Jun 27, 2022 at 6:45 PM Lijie Wang  wrote:

> Hi,
>
> 1. 建议贴下完整的 TM 日志和 jstack
> 2. 可以看下 GC 日志,看下 GC 是否正常
>
> Best,
> Lijie
>
> 李辉  于2022年6月27日周一 15:46写道:
>
> > 求助:如题,Flink 版本 1.13.2,作业部署在 k8s
> >
> > 1、概览:
> >
> >
> > 2、被 hang 住的TM 日志,之后没有其他日志了,也没有异常:
> >
> >
> >
> > 3、jstack 分析,没有发现 Block 状态的线程
> >
> >
> >
>


Re: Flink k8s Operator on AWS?

2022-06-27 Thread Matt Casters
The problem was a misconfiguration of the initContainer which would copy my
artifacts from s3 to an ephemeral volume.  This caused the task manager to
get started for a bit and then to be shut down.  It was hard to get logging
about this since the pods were gone before I could get logging from it.  I
chalk all that up to just me lacking a bit of experience with k8s.

That being said... It's all working now and I documented the deployment
over here:

https://hop.apache.org/manual/next/pipeline/beam/flink-k8s-operator-running-hop-pipeline.html

A big thank you to everyone that helped me out!

Cheers,
Matt

On Mon, Jun 27, 2022 at 4:59 AM Yang Wang  wrote:

> Could you please share the JobManager logs of failed deployment? It will
> also help a lot if you could show the pending pod status via "kubectl
> describe ".
>
> Given that the current Flink Kubernetes Operator is built on top of native
> K8s integration[1], the Flink ResourceManager should allocate enough
> TaskManager pods automatically.
> We need to find out what is wrong via the logs. Maybe the service account
> or taint or something else.
>
>
> [1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html
>
>
> Best,
> Yang
>
> Matt Casters  于2022年6月24日周五 23:48写道:
>
>> Yes of-course.  I already feel a bit less intelligent for having asked
>> the question ;-)
>>
>> The status now is that I managed to have it all puzzled together.
>> Copying the files from s3 to an ephemeral volume takes all of 2 seconds so
>> it's really not an issue.  The cluster starts and our fat jar and Apache
>> Hop MainBeam class is found and started.
>>
>> The only thing that remains is figuring out how to configure the Flink
>> cluster itself.  I have a couple of m5.large ec2 instances in a node group
>> on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
>> in the pipeline can't seem to find resources to start.
>>
>> Caused by:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Slot request bulk is not fulfillable! Could not allocate the required slot
>> within slot request timeout
>>
>> Parallelism was set to 1 for the runner and there are only 2 tasks in my
>> first Beam pipeline so it should be simple enough but it just times out.
>>
>> Next step for me is to document the result which will end up on
>> hop.apache.org.   I'll probably also want to demo this in Austin at the
>> upcoming Beam summit.
>>
>> Thanks a lot for your time and help so far!
>>
>> Cheers,
>> Matt
>>
>>


RE: Synchronizing streams in coprocessfunction

2022-06-27 Thread Schwalbe Matthias
Hi Gopi,

Your use case is a little under-specified to give a specific answer, especially 
to the nature of the two input streams and the way events of both streams are 
correlated (joined):

  *   Is your fast-stream keyed?
 *   If yes: keyed state and timers can be used, otherwise only operator 
state can be used to buffer events, no timers
  *   Is your metadata-stream keyed? I.e.
 *   Metadata-stream events are combined only to fast-stream events having 
the same respective key
*   Implement a KeyedCoProcessFunction …
 *   Metadata-stream events apply to all fast-stream events irrespective of 
the key
*   Implement a KeyedBroadcastProcessFunction (after converting the 
metadata-stream to a broadcast stream)
*   Then in the processBroadcastElement function you can iterate over 
all keys of all state primitives
  *   None of your streams are keyed?
 *   That leaves you only the option of using operator state
*   Current implementation of operator state is not incremental and 
thus it is completely generated/stored with each state checkpoint
*   This allows only a moderate number of datapoints in operator state
  *   Which version of Flink are you using? Recommendations above refer to 
Flink 1.15.0

Looking forward to your answers (also please go a little more into detail of 
you use case) and follow up questions …

Greetings

Thias


From: Gopi Krishna M 
Sent: Monday, June 27, 2022 3:01 PM
To: Qingsheng Ren 
Cc: user@flink.apache.org
Subject: Re: Synchronizing streams in coprocessfunction

Thanks Quingsheng, that would definitely work. But I'm unable to figure out how 
I can apply this with CoProcessFunction. One stream is windowed and trigger 
implementation uses the 2nd stream.

On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren 
mailto:re...@apache.org>> wrote:
Hi Gopi,

What about using a window with a custom trigger? The window is doing nothing 
but aggregating your input to a collection. The trigger accepts metadata from 
the low input stream so it can fire and purge the window (emit all elements in 
the window to downstream) on arrival of metadata.

Best,
Qingsheng

> On Jun 27, 2022, at 12:46, Gopi Krishna M 
> mailto:gopikrish...@gmail.com>> wrote:
>
> Hi,
> I've a scenario where I use connected streams where one is a low throughput 
> metadata stream and another one is a high throughput data stream. I use 
> CoProcessFunction that operates on a data stream with behavior controlled by 
> a metadata stream.
>
> Is there a way to slow down/pause the high throughput data stream until I've 
> received one entry from the metadata stream? It's possible that by the time I 
> get the first element from the metadata stream, I might get 1000s of items 
> from the data stream. One option is to create a state to buffer the data 
> stream within the operator. Is there any other option which doesn't need this 
> state management?
>
> Thanks,
> Gopi
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Gopi Krishna M
Thanks Quingsheng, that would definitely work. But I'm unable to figure out
how I can apply this with CoProcessFunction. One stream is windowed and
trigger implementation uses the 2nd stream.

On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren  wrote:

> Hi Gopi,
>
> What about using a window with a custom trigger? The window is doing
> nothing but aggregating your input to a collection. The trigger accepts
> metadata from the low input stream so it can fire and purge the window
> (emit all elements in the window to downstream) on arrival of metadata.
>
> Best,
> Qingsheng
>
> > On Jun 27, 2022, at 12:46, Gopi Krishna M 
> wrote:
> >
> > Hi,
> > I've a scenario where I use connected streams where one is a low
> throughput metadata stream and another one is a high throughput data
> stream. I use CoProcessFunction that operates on a data stream with
> behavior controlled by a metadata stream.
> >
> > Is there a way to slow down/pause the high throughput data stream until
> I've received one entry from the metadata stream? It's possible that by the
> time I get the first element from the metadata stream, I might get 1000s of
> items from the data stream. One option is to create a state to buffer the
> data stream within the operator. Is there any other option which doesn't
> need this state management?
> >
> > Thanks,
> > Gopi
>
>


Re: 任务 cancel 失败,个别 task 一直处于 CANCELING 状态

2022-06-27 Thread Lijie Wang
Hi,

1. 建议贴下完整的 TM 日志和 jstack
2. 可以看下 GC 日志,看下 GC 是否正常

Best,
Lijie

李辉  于2022年6月27日周一 15:46写道:

> 求助:如题,Flink 版本 1.13.2,作业部署在 k8s
>
> 1、概览:
>
>
> 2、被 hang 住的TM 日志,之后没有其他日志了,也没有异常:
>
>
>
> 3、jstack 分析,没有发现 Block 状态的线程
>
>
>


Re: Synchronizing streams in coprocessfunction

2022-06-27 Thread Qingsheng Ren
Hi Gopi,

What about using a window with a custom trigger? The window is doing nothing 
but aggregating your input to a collection. The trigger accepts metadata from 
the low input stream so it can fire and purge the window (emit all elements in 
the window to downstream) on arrival of metadata. 

Best,
Qingsheng

> On Jun 27, 2022, at 12:46, Gopi Krishna M  wrote:
> 
> Hi,
> I've a scenario where I use connected streams where one is a low throughput 
> metadata stream and another one is a high throughput data stream. I use 
> CoProcessFunction that operates on a data stream with behavior controlled by 
> a metadata stream.
> 
> Is there a way to slow down/pause the high throughput data stream until I've 
> received one entry from the metadata stream? It's possible that by the time I 
> get the first element from the metadata stream, I might get 1000s of items 
> from the data stream. One option is to create a state to buffer the data 
> stream within the operator. Is there any other option which doesn't need this 
> state management?
> 
> Thanks,
> Gopi



Re: flink1.15可以用jdk8吗

2022-06-27 Thread yuxia
可以用 jdk8

Best regards,
Yuxia

- 原始邮件 -
发件人: "yidan zhao" 
收件人: "user-zh" 
发送时间: 星期一, 2022年 6 月 27日 下午 5:11:22
主题: flink1.15可以用jdk8吗

如题,看官方提示要升级到jdk11。但是我下载了官方的flink1.15后,基于jdk8也是可以启动集群的。基于jdk11也是可以。


flink1.15可以用jdk8吗

2022-06-27 Thread yidan zhao
如题,看官方提示要升级到jdk11。但是我下载了官方的flink1.15后,基于jdk8也是可以启动集群的。基于jdk11也是可以。


Re: flink 1.14

2022-06-27 Thread yidan zhao
airflow~

RS  于2022年6月27日周一 09:11写道:
>
> Hi,
> 这边是通过DolphinScheduler来调度的,里面也可以配置job之间的依赖
> 其他调度系统应该也有类似的功能
>
>
> Thanks~
>
>
>
>
>
> 在 2022-04-29 16:03:15,"guanyq"  写道:
> >咨询下各位大佬
> >flink sql在做批处理时,生产环境一般都用什么来做定时调度?
> >如果存在job之间的依赖,生产环境是又是采用什么来做通知的?
> >
> >
> >我这面主要是想把hive sql 修改为 flink sql


[FINAL CALL] - Travel Assistance to ApacheCon New Orleans 2022

2022-06-27 Thread Gavin McDonald
 To all committers and non-committers.

This is a final call to apply for travel/hotel assistance to get to and
stay in New Orleans
for ApacheCon 2022.

Applications have been extended by one week and so the application deadline
is now the 8th July 2022.

The rest of this email is a copy of what has been sent out previously.

We will be supporting ApacheCon North America in New Orleans, Louisiana,
on October 3rd through 6th, 2022.

TAC exists to help those that would like to attend ApacheCon events, but
are unable to do so for financial reasons. This year, We are supporting
both committers and non-committers involved with projects at the
Apache Software Foundation, or open source projects in general.

For more info on this year's applications and qualifying criteria, please
visit the TAC website at http://www.apache.org/travel/
Applications have been extended until the 8th of July 2022.

Important: Applicants have until the closing date above to submit their
applications (which should contain as much supporting material as required
to efficiently and accurately process their request), this will enable TAC
to announce successful awards shortly afterwards.

As usual, TAC expects to deal with a range of applications from a diverse
range of backgrounds. We therefore encourage (as always) anyone thinking
about sending in an application to do so ASAP.

Why should you attend as a TAC recipient? We encourage you to read stories
from
past recipients at https://apache.org/travel/stories/ . Also note that
previous TAC recipients have gone on to become Committers, PMC Members, ASF
Members, Directors of the ASF Board and Infrastructure Staff members.
Others have gone from Committer to full time Open Source Developers!

How far can you go! - Let TAC help get you there.


===

Gavin McDonald on behalf of the Travel Assistance Committee.


任务 cancel 失败,个别 task 一直处于 CANCELING 状态

2022-06-27 Thread 李辉
求助:如题,Flink 版本 1.13.2,作业部署在 k8s

1、概览:


2、被 hang 住的TM 日志,之后没有其他日志了,也没有异常:



3、jstack 分析,没有发现 Block 状态的线程


taskmanager_mipha-69-taskmanager-1-18_thread_dump.json
Description: application/json




新kafka source中如何做到忽略kafka offset呢

2022-06-27 Thread yidan zhao
如题,我期望自定义加个开关,在指定开关情况重启作业时,能够强制kafka采用latest,而不是基于状态重启。  注意其他算子还需要基于状态重启。


之前我是覆盖了FlinkKafkaConsumerBase中的initializeState方法,根据我加的bool类型配置,决定是否restore
kafka souce部分的状态。


目前按照新的kafkaSouce我分析下来,大概有几个思路,不清楚是否ok。
(1)从SourceOperator角度调整,这需要覆盖运行时类,暂不考虑。
(2)覆盖 
SourceReaderBase的addSplits方法,但是这个地方目前的实现貌似无法区分是新加的partition还是restore的partition。
虽然也不要紧,变更topic和新增partition的情况也很少。 我可以在这个地方根据配置决定是否强制使用latest。
(3)新kafkaSource在基于state重启时,KafkaSourceEnumerator恢复的状态仅仅是assignedPartition,本质和offset没任何关系。只有发现新的partition才可能。
 
从KafkaSourceEnumerator角度考虑,可以让KafkaSourceEnumerator忽略状态,然后新发现的topic就都是new,经过initOffset可自定义。
 但是,这种情况下,需要让sourceReader部分忽略restore的kafkaSplit
(SourceOperator.open方法中的调用),只接受新分配过来的kafkaSplit(SourceOperator.handleAddSplitsEvent方法中的调用),还是需要调整
sourceOperator。


如上,貌似目前没有特别好改动方法,期望能简单改动,因为每次版本升级我都需要实现一次。


Flink状态过期时是否可以将其输出到日志中

2022-06-27 Thread haishui
Hi,
Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。

Best Regards!