Re: [DISCUSS] FLIP-391: Deprecate RuntimeContext#getExecutionConfig

2023-11-15 Thread Zhu Zhu
Thanks Junrui for creating the FLIP and kicking off this discussion.

Exposing a mutable ExecutionConfig which is even shared by multiple
operators is truly a defect which can result in weird results.

+1

Thanks,
Zhu

Junrui Lee  于2023年11月15日周三 16:53写道:

> Hi all,
>
> I'd like to start a discussion of FLIP-391: Deprecate
> RuntimeContext#getExecutionConfig[1].
>
> Currently, the FLINK RuntimeContext is important for connecting user
> functions to the underlying runtime details. It provides users with
> necessary runtime information during job execution.
> However, he current implementation of the FLINK RuntimeContext exposes the
> ExecutionConfig to users, resulting in two issues:
> Firstly, the ExecutionConfig contains much unrelated information that can
> confuse users and complicate management.
> Secondly, exposing the ExecutionConfig allows users to modify it during job
> execution, which can cause inconsistencies and problems, especially with
> operator chaining.
>
> Therefore, we propose deprecating the RuntimeContext#getExecutionConfig in
> the FLINK RuntimeContext. In the upcoming FLINK-2.0 version, we plan to
> completely remove the RuntimeContext#getExecutionConfig method. Instead, we
> will introduce alternative getter methods that enable users to access
> specific information without exposing unnecessary runtime details. These
> getter methods will include:
>
> 1. @PublicEvolving  TypeSerializer
> createSerializer(TypeInformation typeInformation);
> 2. @PublicEvolving Map getGlobalJobParameters();
> 3. @PublicEvolving boolean isObjectReuseEnabled();
>
> Looking forward to your feedback and suggestions, thanks.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465937
>
> Best regards,
> Junrui
>


Re: [DISCUSS] Planning Flink 1.17

2022-10-23 Thread Zhu Zhu
Thanks for kicking off the new release.

+1 for January 17th as the feature freeze date :)
+1 for Qingsheng, Leonard, Martijn and Matthias as release managers

Thanks,
Zhu

Dong Lin  于2022年10月23日周日 15:09写道:
>
> Thanks for kicking off the release plan.
>
> +1 for the proposed timeline.
>
> Best,
> Dong
>
>
> On Thu, Oct 20, 2022 at 3:46 PM Qingsheng Ren  wrote:
>>
>> Hi everyone,
>>
>> As we are approaching the official release of Flink 1.16, it’s a good time 
>> to kick off some discussions and march toward 1.17.
>>
>> - Release managers
>>
>> Leonard Xu and I would like to volunteer as release managers for 1.17, and 
>> it would be great to have someone else working together on this release. 
>> Please let us know if you have any interest!
>>
>> - Timeline
>>
>> Having 1.16 will be released in the next few days and the 4 months release 
>> cycle after that, we propose to set the feature freezing date on *January 
>> 17th, 2023* (aligned with our version number 1.17 :-)), so that everyone 
>> could enjoy the holiday season and Chinese new year.
>>
>> - What we’ll be focusing
>>
>> Similar to our previous releases, we’d like to keep an eye on the timeline, 
>> CI stability, release testing, and any communication and coordination across 
>> teams and developers. One thing we’d like to mention in particular is 
>> compatibility, which is a frequent complaint from our ecosystem developers 
>> and users. We encourage all committers to do an extra manual check to see if 
>> any public interface is touched before merging a PR. We could discuss 
>> details in another thread later and update the contributing guidelines to 
>> list which should be treated as public APIs. Please feel free to raise any 
>> discussions if you have anything else to emphasize specifically.
>>
>> - Collecting features
>>
>> We'll create a wiki page under this directory[1] for collecting new features 
>> targeted in 1.17 as we always did before to give everyone an overview and 
>> track the progress. Please don’t hesitate to share your ideas on the page. 
>> In the meantime, we’d like to kindly invite our committers to think about 
>> and plan what we could deliver to developers and users in this release.
>>
>> Looking forward to working with you all in the coming 1.17 release!
>>
>> Best regards,
>> Qingsheng Ren and Leonard Xu
>> Ververica (Alibaba)
>>
>> [1] 
>> https://cwiki.apache.org/confluence/display/FLINK/Release+Management+and+Feature+Plan


Re: Jobmanagers are in a crash loop after upgrade from 1.12.2 to 1.13.1

2021-06-30 Thread Zhu Zhu
Hi Shilpa,

JobType was introduced in 1.13. So I guess the cause is that the client
which creates and submit
the job is still 1.12.2. The client generates a outdated job graph which
does not have its JobType
set and resulted in this NPE problem.

Thanks,
Zhu

Austin Cawley-Edwards  于2021年7月1日周四 上午1:54写道:

> Hi Shilpa,
>
> Thanks for reaching out to the mailing list and providing those logs! The
> NullPointerException looks odd to me, but in order to better guess what's
> happening, can you tell me a little bit more about what your setup looks
> like? How are you deploying, i.e., standalone with your own manifests, the
> Kubernetes integration of the Flink CLI, some open-source operator, etc.?
>
> Also, are you using a High Availability setup for the JobManager?
>
> Best,
> Austin
>
>
> On Wed, Jun 30, 2021 at 12:31 PM Shilpa Shankar 
> wrote:
>
>> Hello,
>>
>> We have a flink session cluster in kubernetes running on 1.12.2. We
>> attempted an upgrade to v 1.13.1, but the jobmanager pods are continuously
>> restarting and are in a crash loop.
>>
>> Logs are attached for reference.
>>
>> How do we recover from this state?
>>
>> Thanks,
>> Shilpa
>>
>


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-06 Thread Zhu Zhu
Thanks Dawid and Guowei for being the release managers! And thanks everyone
who has made this release possible!

Thanks,
Zhu

Yun Tang  于2021年5月6日周四 下午2:30写道:

> Thanks for Dawid and Guowei's great work, and thanks for everyone involved
> for this release.
>
> Best
> Yun Tang
> --
> *From:* Xintong Song 
> *Sent:* Thursday, May 6, 2021 12:08
> *To:* user ; dev 
> *Subject:* Re: [ANNOUNCE] Apache Flink 1.13.0 released
>
> Thanks Dawid & Guowei as the release managers, and everyone who has
> contributed to this release.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, May 6, 2021 at 9:51 AM Leonard Xu  wrote:
>
> > Thanks Dawid & Guowei for the great work, thanks everyone involved.
> >
> > Best,
> > Leonard
> >
> > 在 2021年5月5日,17:12,Theo Diefenthal 
> 写道:
> >
> > Thanks for managing the release. +1. I like the focus on improving
> > operations with this version.
> >
> > --
> > *Von: *"Matthias Pohl" 
> > *An: *"Etienne Chauchot" 
> > *CC: *"dev" , "Dawid Wysakowicz" <
> > dwysakow...@apache.org>, "user" ,
> > annou...@apache.org
> > *Gesendet: *Dienstag, 4. Mai 2021 21:53:31
> > *Betreff: *Re: [ANNOUNCE] Apache Flink 1.13.0 released
> >
> > Yes, thanks for managing the release, Dawid & Guowei! +1
> >
> > On Tue, May 4, 2021 at 4:20 PM Etienne Chauchot 
> > wrote:
> >
> >> Congrats to everyone involved !
> >>
> >> Best
> >>
> >> Etienne
> >> On 03/05/2021 15:38, Dawid Wysakowicz wrote:
> >>
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.13.0.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2021/05/03/release-1.13.0.html
> >>
> >> The full release notes are available in Jira:
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >>
> >> Regards,
> >> Guowei & Dawid
> >>
> >>
> >
> >
>


Re: [ANNOUNCE] Apache Flink 1.12.2 released

2021-03-08 Thread Zhu Zhu
Thanks Roman and Yuan for being the release managers! Thanks everyone who
has made this release possible!

Cheers,
Zhu

Piotr Nowojski  于2021年3月6日周六 上午12:38写道:

> Thanks Roman and Yuan for your work and driving the release process :)
>
> pt., 5 mar 2021 o 15:53 Till Rohrmann  napisał(a):
>
>> Great work! Thanks a lot for being our release managers Roman and Yuan and
>> to everyone who has made this release possible.
>>
>> Cheers,
>> Till
>>
>> On Fri, Mar 5, 2021 at 10:43 AM Yuan Mei  wrote:
>>
>> > Cheers!
>> >
>> > Thanks, Roman, for doing the most time-consuming and difficult part of
>> the
>> > release!
>> >
>> > Best,
>> >
>> > Yuan
>> >
>> > On Fri, Mar 5, 2021 at 5:41 PM Roman Khachatryan 
>> wrote:
>> >
>> > > The Apache Flink community is very happy to announce the release of
>> > Apache
>> > > Flink 1.12.2, which is the second bugfix release for the Apache Flink
>> > 1.12
>> > > series.
>> > >
>> > > Apache Flink® is an open-source stream processing framework for
>> > > distributed, high-performing, always-available, and accurate data
>> > streaming
>> > > applications.
>> > >
>> > > The release is available for download at:
>> > > https://flink.apache.org/downloads.html
>> > >
>> > > Please check out the release blog post for an overview of the
>> > improvements
>> > > for this bugfix release:
>> > > https://flink.apache.org/news/2021/03/03/release-1.12.2.html
>> > >
>> > > The full release notes are available in Jira:
>> > >
>> > >
>> >
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12349502=12315522
>> > >
>> > > We would like to thank all contributors of the Apache Flink community
>> who
>> > > made this release possible!
>> > >
>> > > Special thanks to Yuan Mei for managing the release and PMC members
>> > Robert
>> > > Metzger, Chesnay Schepler and Piotr Nowojski.
>> > >
>> > > Regards,
>> > > Roman
>> > >
>> >
>>
>


Re: 如何在程序里面判断作业是否是重启了

2021-02-04 Thread Zhu Zhu
RuntimeContext 有 getAttemptNumber() 接口,可以看出任务是第几次重跑了。
但是一般来说,我们都是通过外部系统监控 Flink 作业的 numRestarts metric 来判断作业是不是发生了 failover,进行报警。

Thanks,
Zhu

tison  于2021年2月5日周五 下午12:10写道:

> 目前想到的是加一个调度器插件,在重启事件那边 hook 一下。
>
> 正常的重启流程貌似没有其他 hook 点了,抄送一下这方面的专家(in cc)看看有没有其他意见。
>
> Best,
> tison.
>
>
> 熊云昆  于2021年2月5日周五 上午11:30写道:
>
>>
>> super.getRuntimeContext().getAttemptNumber()试试这个方法获取重启次数试试,如果没有重启过是0,反之每重启一次就会加1
>>
>>
>> | |
>> 熊云昆
>> |
>> |
>> 邮箱:xiongyun...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>> 在2021年02月04日 11:42,op 写道:
>> 你好,我们下游不是所有需求都会去重,开销有点大。。。
>>
>>
>>
>>
>> --原始邮件--
>> 发件人:
>>   "user-zh"
>> <
>> zapj...@163.com;
>> 发送时间:2021年2月4日(星期四) 中午11:31
>> 收件人:"user-zh">
>> 主题:Re:回复: 如何在程序里面判断作业是否是重启了
>>
>>
>>
>>
>>
>>
>> 下游数据做好幂等操作,就不怕重复操作了。。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-02-04 11:26:56,"op" <520075...@qq.com 写道:
>> 重启可能会导致数据重发,想加个告警
>> 
>> 
>> 
>> 
>> --nbsp;原始邮件nbsp;--
>> 发件人:
>> "user-zh"
>> > 发送时间:nbsp;2021年2月4日(星期四) 中午11:11
>> 收件人:nbsp;"user-zh"> 
>> 主题:nbsp;Re: 如何在程序里面判断作业是否是重启了
>> 
>> 
>> 
>> 业务上的需求是什么?
>> 
>> Best,
>> tison.
>> 
>> 
>> op <520075...@qq.comgt; 于2021年2月4日周四 上午11:04写道:
>> 
>> gt; 大家好:
>> gt; amp;nbsp;
>> gt;
>> amp;nbsp;我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?
>
>


Re: [ANNOUNCE] Apache Flink 1.10.3 released

2021-01-31 Thread Zhu Zhu
Thanks Xintong for being the release manager and everyone who helped with
the release!

Cheers,
Zhu

Dian Fu  于2021年1月29日周五 下午5:56写道:

> Thanks Xintong for driving this release!
>
> Regards,
> Dian
>
> 在 2021年1月29日,下午5:24,Till Rohrmann  写道:
>
> Thanks Xintong for being our release manager. Well done!
>
> Cheers,
> Till
>
> On Fri, Jan 29, 2021 at 9:50 AM Yang Wang  wrote:
>
>> Thanks Xintong for driving this release.
>>
>> Best,
>> Yang
>>
>> Yu Li  于2021年1月29日周五 下午3:52写道:
>>
>>> Thanks Xintong for being our release manager and everyone else who made
>>> the release possible!
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Fri, 29 Jan 2021 at 15:05, Xintong Song  wrote:
>>>
 The Apache Flink community is very happy to announce the release of
 Apache
 Flink 1.10.3, which is the third bugfix release for the Apache Flink
 1.10
 series.

 Apache Flink® is an open-source stream processing framework for
 distributed, high-performing, always-available, and accurate data
 streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements
 for this bugfix release:
 https://flink.apache.org/news/2021/01/29/release-1.10.3.html

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668

 We would like to thank all contributors of the Apache Flink community
 who
 made this release possible!

 Regards,
 Xintong Song

>>>
>


Re: [ANNOUNCE] Apache Flink 1.10.3 released

2021-01-31 Thread Zhu Zhu
Thanks Xintong for being the release manager and everyone who helped with
the release!

Cheers,
Zhu

Dian Fu  于2021年1月29日周五 下午5:56写道:

> Thanks Xintong for driving this release!
>
> Regards,
> Dian
>
> 在 2021年1月29日,下午5:24,Till Rohrmann  写道:
>
> Thanks Xintong for being our release manager. Well done!
>
> Cheers,
> Till
>
> On Fri, Jan 29, 2021 at 9:50 AM Yang Wang  wrote:
>
>> Thanks Xintong for driving this release.
>>
>> Best,
>> Yang
>>
>> Yu Li  于2021年1月29日周五 下午3:52写道:
>>
>>> Thanks Xintong for being our release manager and everyone else who made
>>> the release possible!
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Fri, 29 Jan 2021 at 15:05, Xintong Song  wrote:
>>>
 The Apache Flink community is very happy to announce the release of
 Apache
 Flink 1.10.3, which is the third bugfix release for the Apache Flink
 1.10
 series.

 Apache Flink® is an open-source stream processing framework for
 distributed, high-performing, always-available, and accurate data
 streaming
 applications.

 The release is available for download at:
 https://flink.apache.org/downloads.html

 Please check out the release blog post for an overview of the
 improvements
 for this bugfix release:
 https://flink.apache.org/news/2021/01/29/release-1.10.3.html

 The full release notes are available in Jira:

 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348668

 We would like to thank all contributors of the Apache Flink community
 who
 made this release possible!

 Regards,
 Xintong Song

>>>
>


Re: slot problem

2020-11-24 Thread Zhu Zhu
Each task will be assigned a dedicated thread for its data processing.
A slot can be shared by multiple tasks if they are in the same slot sharing
group[1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#task-slots-and-resources

Thanks,
Zhu

ゞ野蠻遊戲χ  于2020年11月25日周三 上午10:32写道:

> Hi all
>
>Can only one thread run at a time for a slot? Or one slot can run
> multiple threads in parallel at the same time?
>
> Thanks,
> Jiazhi
>


Re: App gets stuck in Created State

2020-09-21 Thread Zhu Zhu
Hi Arpith,

All tasks in CREATED state indicates no task is scheduled yet. It is
strange it a job gets stuck in this state.
Is it possible that you share the job manager log so we can check what is
happening there?

Thanks,
Zhu

Arpith P  于2020年9月21日周一 下午3:52写道:

> Hi,
>
> We have Flink 1.8.0 cluster deployed in Hadoop distributed mode, I often
> see even though Hadoop has enough resources Flink sits in Created state.
> We have 4 operators using 15 parallelism, 1 operator using 40 & 2 operators
> using 10. At time of submission I'm passing taskmanager memory as 4Gb and
> job manager memory as 2gb. and 2 slots This request should only take 20
> containers and 40 Vcores. But I see Flink is overallocating resource of 65
> containers and 129 Cores . I've attached snapshots for references.
>
> Right now I'm passing:  -yD yarn.heartbeat.container-request-interval=1000
> -yD taskmanager.network.memory.fraction=0.045 -yD
> taskmanager.memory.preallote=true.
>
> How do I control resource allocation?.
>
>


[ANNOUNCE] Apache Flink 1.11.2 released

2020-09-16 Thread Zhu Zhu
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/09/17/release-1.11.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


[ANNOUNCE] Apache Flink 1.11.2 released

2020-09-16 Thread Zhu Zhu
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/09/17/release-1.11.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


Re: Why setAllVerticesInSameSlotSharingGroupByDefault is set to false in batch mode

2020-09-15 Thread Zhu Zhu
Hi Zheng,

To divide managed memory for operators[1], we need to consider which tasks
will
run in the same slot. In batch jobs, vertices in different regions may not
run at
the same time. If we put them in the same slot sharing group, running tasks
may run slower with less managed memory, while managed memory reserved
for tasks that are not running yet will be wasted.

More slots can be requested as a result. However, it's not necessary to add
more
containers. One slot will serve fewer tasks, this means you can decrease
the slot
size (via increase "taskmanager.numberOfTaskSlots") so that the previous
number
of containers can be enough. This is because you are still running that
many tasks
at the same time, although they are spread into more slots.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management

Thanks,
Zhu

zheng faaron  于2020年9月16日周三 上午10:58写道:

> Hi All,
>
> I find we set AllVerticesInSameSlotSharingGroupByDefault to false in
> flink 1.10. It will make batch job request lots of containers. I'm not sure
> why we set it to false directly. I try to set it to true and find the batch
> job can run correctly with a small amount containers. Why  don't we add a
> configuration to let user configure it?
>
> Best,
> Faaron Zheng
>


Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Zhu Zhu
Congratulations Dian!

Thanks,
Zhu

Zhijiang  于2020年8月27日周四 下午6:04写道:

> Congrats, Dian!
>
> --
> From:Yun Gao 
> Send Time:2020年8月27日(星期四) 17:44
> To:dev ; Dian Fu ; user <
> u...@flink.apache.org>; user-zh 
> Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu
>
> Congratulations Dian !
>
>  Best
>  Yun
>
>
> --
> Sender:Marta Paes Moreira
> Date:2020/08/27 17:42:34
> Recipient:Yuan Mei
> Cc:Xingbo Huang; jincheng sun >; dev; Dian Fu; user<
> u...@flink.apache.org>; user-zh
> Theme:Re: [ANNOUNCE] New PMC member: Dian Fu
>
> Congrats, Dian!
> On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:
>
> Congrats!
> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
> Congratulations Dian!
>
> Best,
> Xingbo
> jincheng sun  于2020年8月27日周四 下午5:24写道:
>
> Hi all,
>
>
> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
> the Apache Flink Project Management Committee (PMC).
>
>
> Dian Fu has been very active on PyFlink component, working on various 
> important features, such as the Python UDF and Pandas integration, and keeps 
> checking and voting for our releases, and also has successfully produced two 
> releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
> release of Flink 1.12.
>
> Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>
> Best,
> Jincheng(on behalf of the Flink PMC)
>
>
>


Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Zhu Zhu
Congratulations Dian!

Thanks,
Zhu

Zhijiang  于2020年8月27日周四 下午6:04写道:

> Congrats, Dian!
>
> --
> From:Yun Gao 
> Send Time:2020年8月27日(星期四) 17:44
> To:dev ; Dian Fu ; user <
> user@flink.apache.org>; user-zh 
> Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu
>
> Congratulations Dian !
>
>  Best
>  Yun
>
>
> --
> Sender:Marta Paes Moreira
> Date:2020/08/27 17:42:34
> Recipient:Yuan Mei
> Cc:Xingbo Huang; jincheng sun >; dev; Dian Fu; user<
> user@flink.apache.org>; user-zh
> Theme:Re: [ANNOUNCE] New PMC member: Dian Fu
>
> Congrats, Dian!
> On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:
>
> Congrats!
> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
> Congratulations Dian!
>
> Best,
> Xingbo
> jincheng sun  于2020年8月27日周四 下午5:24写道:
>
> Hi all,
>
>
> On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
> the Apache Flink Project Management Committee (PMC).
>
>
> Dian Fu has been very active on PyFlink component, working on various 
> important features, such as the Python UDF and Pandas integration, and keeps 
> checking and voting for our releases, and also has successfully produced two 
> releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
> release of Flink 1.12.
>
> Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>
> Best,
> Jincheng(on behalf of the Flink PMC)
>
>
>


[ANNOUNCE] Apache Flink 1.10.2 released

2020-08-24 Thread Zhu Zhu
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.2, which is the first bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/08/25/release-1.10.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


[ANNOUNCE] Apache Flink 1.10.2 released

2020-08-24 Thread Zhu Zhu
The Apache Flink community is very happy to announce the release of Apache
Flink 1.10.2, which is the first bugfix release for the Apache Flink 1.10
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/08/25/release-1.10.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


Re: Change in sub-task id assignment from 1.9 to 1.10?

2020-08-12 Thread Zhu Zhu
Hi Ken,

There were no such changes in my mind.
And in Flink there was no designed logic to scatter subtasks of the same
operator into different taskmanagers.

One workaround to solve your problem could be to increase the parallelism
of
your source vertex to be no smaller than no other operator so that each
slot can contain a source task. With config cluster.evenly-spread-out-slots
set to true, slots can be evenly distributed in all available taskmanagers
in most cases.

Thanks,
Zhu Zhu

Ken Krugler  于2020年8月7日周五 上午5:28写道:

> Hi all,
>
> Was there any change in how sub-tasks get allocated to TMs, from Flink 1.9
> to 1.10?
>
> Specifically for consecutively numbered sub-tasks (e.g. 0, 1, 2) did it
> become more or less likely that they’d be allocated to the same Task
> Manager?
>
> Asking because a workflow that ran fine in 1.9 now has a “hot” TM that’s
> having trouble keeping up with a Kafka topic.
>
> The most promising explanation is that now there are three sub-tasks on
> the same TM that are reading from that topic, versus previously they’d be
> scattered across multiple TMs.
>
> But I don’t see significant changes in this area post 1.8
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


Re: In JobGraph, does an IntermediateDataSet contain multiple JobEdges?

2020-08-04 Thread Zhu Zhu
What you obsessed is right. At the moment, one IntermediateDataSet can have
one only consumer job edge in production code path.

Thanks,
Zhu Zhu

yuehan1  于2020年8月4日周二 下午5:14写道:

> IntermediateDataSet.java has a JobEdge list named consumers.
> In which case, an IntermediateDataSet contains multiple JobEdge.
> I read the code, it seems that an IntermediateDataSet can only have one
> JobEdge
>
> // IntermediateDataSet.java
> private final List consumers = new ArrayList();
>
> thanks for your reply
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink jobs getting finished because of "Could not allocate the required slot within slot request timeout"

2020-07-29 Thread Zhu Zhu
Hi Sateesh,

Would you check Flink jobmanager log to see whether it has sent container
requests to YARN RM?
If the request is sent but not fulfilled, you will need to check the YARN
RM logs or the YARN cluster
resources at that time to see whether that container request is fulfillable.
The resources for a requested container can be found in Flink JM log.

Thanks,
Zhu Zhu

mars  于2020年7月29日周三 下午10:52写道:

> Hi All,
>
>  I have an EMR Cluster with one Master Node and 3 worker Nodes ( it has
> auto
> scaling enabled and the max no.of worker nodes can go up to 8).
>
> I have 3 Spark Jobs that are running currently on the Cluster.
>
> I submitted 3 Flink Jobs and all of them finished as the slots are not
> available error.
>
> In flink-conf.xml i have
>
> jobmanager.heap.mb: 4096
> taskmanager.heap.mb: 4096
>
> And the Master node has 16 vcores and 64Gb Memory and each worker node has
> 4
> vcores and 16GB Memory.
>
> And when i am submitting the flink job i am passing the arg (-p 2) which
> should set the parallelism to 2.
>
> And YARN UI is showing the following stats
>
> Containers Running : 7
> Memory Used  : 21.63GB
> Memory Total  : 36GB
> vCores Used   : 7
> VCores Total   : 12
> Active Nodes  : 3
>
> I cannot figure out why the slots cannot be allocated to Flink Jobs. First
> of all even with 3 Active Nodes there are still 5 VCores available and more
> over for this Cluster Auto Scaling is enabled and EMR should allocate up to
> 8 Nodes i.e 5 more new nodes should be allocated is required.
>
> Appreciate any insights.
>
> Also i cannot find the task manager logs on any of the nodes.
>
> Thanks
> Sateesh
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How does TaskManager announce JobManager about available ResultPartitions?

2020-07-20 Thread Zhu Zhu
Hi Joseph,
The availability of pipelined result partition is notified to JM
via scheduleOrUpdateConsumers RPC.

Just want to mention that it's better to send such questions to the user
mail list.

Thanks,
Zhu Zhu

Fork Joseph  于2020年7月21日周二 上午3:30写道:

> Hi,
>
> According to description in
>
> https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
> ,
> TaskManager announces (tells) JobManager about available
> ResultSubpartitions  (RSs) to let another TM to initiate the transfer.
> However, I can’t find where in the codebase TM actually announces JM about
> RSs for streaming (Pipelined) mode.
>
> Thanks!
> Joseph
>


Re: NoResourceAvailableException and JobNotFound Errors

2020-06-02 Thread Zhu Zhu
Hi Prasanna,

The job failed because it fails to acquire enough slots to run tasks.
Did you launch any task manager?

The JobNotFound exception is thrown because someone(possibly Flink UI)
sends a query for a job that does not exist in the Flink cluster.
>From the log you attached, the job id of your job
is c23a172cda6cc659296af6452ff57f45, but the REST request is get the info
of job be3d6b9751b6e9c509b9bedeb581a72e.

Thanks,
Zhu Zhu


Prasanna kumar  于2020年6月3日周三 上午2:16写道:

> Hi ,
>
> I am running flink locally in my machine with following configurations.
>
> # The RPC port where the JobManager is reachable.
>
> jobmanager.rpc.port: 6123
>
>
> # The heap size for the JobManager JVM
>
> jobmanager.heap.size: 1024m
>
>
> # The heap size for the TaskManager JVM
>
> taskmanager.heap.size: 1024m
>
>
> # The number of task slots that each TaskManager offers. Each slot runs
> one parallel pipeline.
>
> taskmanager.numberOfTaskSlots: 8
>
>
> # The parallelism used for programs that did not specify and other
> parallelism.
>
> parallelism.default: 1
>
> When i run my program i end up getting
>
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 1, slots allocated: 0
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:991)
>
> at
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
>
> at
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
>
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>
> JobManager Logs
>
> 2020-06-02 23:25:09,992 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler   - Exception 
> occurred in REST handler.
> org.apache.flink.runtime.rest.NotFoundException: Job 
> be3d6b9751b6e9c509b9bedeb581a72e not found
>
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
> not find Flink job (be3d6b9751b6e9c509b9bedeb581a72e)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:766)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:485)
>
>  Finally its shutdown
>
> 2020-06-02 23:30:05,427 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job c23a172cda6cc659296af6452ff57f45.
> 2020-06-02 23:30:05,427 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2020-06-02 23:30:05,428 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> c23a172cda6cc659296af6452ff57f45 reached globally terminal state FAILED.
> 2020-06-02 23:30:05,449 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Flink Streaming Single 
> Environment(c23a172cda6cc659296af6452ff57f45).
> 2020-06-02 23:30:05,450 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 9da4590b1bbc3c104e70e270988db461: JobManager is shutting down..
> 2020-06-02 23:30:05,450 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2020-06-02 23:30:05,450 INFO  
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
> Disconnect job manager 
> 0...@akka.tcp://flink@localhost:6123/user/jobmanager_0
>  for job c23a172cda6cc659296af6452ff57f45 from the resource manager.
> 2020-06-02 23:30:05,451 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2020-06-02 23:30:05,451 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
>
>
> Thanks,
> Prasanna.
>


Re: Apache Flink - Question about application restart

2020-05-28 Thread Zhu Zhu
Restarting of flink master does not change the jobId if one yarn
application.
To be simple, in a yarn application that runs a flink cluster, the job id
of a job does not change once the job is submitted.
You can even submit a flink application multiples times to that cluster (if
it is session mode) but each submission will be treated as a different job
and will have a different job id.

Thanks,
Zhu Zhu

M Singh  于2020年5月29日周五 上午4:59写道:

> Thanks Till - in the case of restart of flink master - I believe the jobid
> will be different.  Thanks
>
> On Thursday, May 28, 2020, 11:33:38 AM EDT, Till Rohrmann <
> trohrm...@apache.org> wrote:
>
>
> Hi,
>
> Yarn won't resubmit the job. In case of a process failure where Yarn
> restarts the Flink Master, the Master will recover the submitted jobs from
> a persistent storage system.
>
> Cheers,
> Till
>
> On Thu, May 28, 2020 at 4:05 PM M Singh  wrote:
>
> Hi Till/Zhu/Yang:  Thanks for your replies.
>
> So just to clarify - the job id remains same if the job restarts have not
> been exhausted.  Does Yarn also resubmit the job in case of failures and if
> so, then is the job id different.
>
> Thanks
> On Wednesday, May 27, 2020, 10:05:40 AM EDT, Till Rohrmann <
> trohrm...@apache.org> wrote:
>
>
> Hi,
>
> if you submit the same job multiple times, then it will get every time a
> different JobID assigned. For Flink, different job submissions are
> considered to be different jobs. Once a job has been submitted, it will
> keep the same JobID which is important in order to retrieve the checkpoints
> associated with this job.
>
> Cheers,
> Till
>
> On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:
>
> Hi Zhu Zhu:
>
> I have another clafication - it looks like if I run the same app multiple
> times - it's job id changes.  So it looks like even though the graph is the
> same the job id is not dependent on the job graph only since with different
> runs of the same app it is not the same.
>
> Please let me know if I've missed anything.
>
> Thanks
>
> On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh 
> wrote:
>
>
> Hi Zhu Zhu:
>
> Just to clarify - from what I understand, EMR also has by default restart
> times (I think it is 3). So if the EMR restarts the job - the job id is the
> same since the job graph is the same.
>
> Thanks for the clarification.
>
> On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang 
> wrote:
>
>
> Just share some additional information.
>
> When deploying Flink application on Yarn and it exhausted restart policy,
> then
> the whole application will failed. If you start another instance(Yarn
> application),
> even the high availability is configured, we could not recover from the
> latest
> checkpoint because the clusterId(i.e. applicationId) has changed.
>
>
> Best,
> Yang
>
> Zhu Zhu  于2020年5月25日周一 上午11:17写道:
>
> Hi M,
>
> Regarding your questions:
> 1. yes. The id is fixed once the job graph is generated.
> 2. yes
>
> Regarding yarn mode:
> 1. the job id keeps the same because the job graph will be generated once
> at client side and persist in DFS for reuse
> 2. yes if high availability is enabled
>
> Thanks,
> Zhu Zhu
>
> M Singh  于2020年5月23日周六 上午4:06写道:
>
> Hi Flink Folks:
>
> If I have a Flink Application with 10 restarts, if it fails and restarts,
> then:
>
> 1. Does the job have the same id ?
> 2. Does the automatically restarting application, pickup from the last
> checkpoint ? I am assuming it does but just want to confirm.
>
> Also, if it is running on AWS EMR I believe EMR/Yarn is configured to
> restart the job 3 times (after it has exhausted it's restart policy) .  If
> that is the case:
> 1. Does the job get a new id ? I believe it does, but just want to confirm.
> 2. Does the Yarn restart honor the last checkpoint ?  I believe, it does
> not, but is there a way to make it restart from the last checkpoint of the
> failed job (after it has exhausted its restart policy) ?
>
> Thanks
>
>
>


Re: Apache Flink - Question about application restart

2020-05-27 Thread Zhu Zhu
Hi M,

Sorry I missed your message.
JobID will not change for a generated JobGraph. However, a new JobGraph
will be generated each time a job is submitted.
So that multiple submissions will have multiple JobGraphs. This is because
different submissions are considered as different jobs, as Till mentioned.
One example is that you can submit an application to a cluster multiple
times at the same time, different JobIDs are needed to differentiate them.

Thanks,
Zhu Zhu

Till Rohrmann  于2020年5月27日周三 下午10:05写道:

> Hi,
>
> if you submit the same job multiple times, then it will get every time a
> different JobID assigned. For Flink, different job submissions are
> considered to be different jobs. Once a job has been submitted, it will
> keep the same JobID which is important in order to retrieve the checkpoints
> associated with this job.
>
> Cheers,
> Till
>
> On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:
>
>> Hi Zhu Zhu:
>>
>> I have another clafication - it looks like if I run the same app multiple
>> times - it's job id changes.  So it looks like even though the graph is the
>> same the job id is not dependent on the job graph only since with different
>> runs of the same app it is not the same.
>>
>> Please let me know if I've missed anything.
>>
>> Thanks
>>
>> On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh 
>> wrote:
>>
>>
>> Hi Zhu Zhu:
>>
>> Just to clarify - from what I understand, EMR also has by default restart
>> times (I think it is 3). So if the EMR restarts the job - the job id is the
>> same since the job graph is the same.
>>
>> Thanks for the clarification.
>>
>> On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang <
>> danrtsey...@gmail.com> wrote:
>>
>>
>> Just share some additional information.
>>
>> When deploying Flink application on Yarn and it exhausted restart policy,
>> then
>> the whole application will failed. If you start another instance(Yarn
>> application),
>> even the high availability is configured, we could not recover from the
>> latest
>> checkpoint because the clusterId(i.e. applicationId) has changed.
>>
>>
>> Best,
>> Yang
>>
>> Zhu Zhu  于2020年5月25日周一 上午11:17写道:
>>
>> Hi M,
>>
>> Regarding your questions:
>> 1. yes. The id is fixed once the job graph is generated.
>> 2. yes
>>
>> Regarding yarn mode:
>> 1. the job id keeps the same because the job graph will be generated once
>> at client side and persist in DFS for reuse
>> 2. yes if high availability is enabled
>>
>> Thanks,
>> Zhu Zhu
>>
>> M Singh  于2020年5月23日周六 上午4:06写道:
>>
>> Hi Flink Folks:
>>
>> If I have a Flink Application with 10 restarts, if it fails and restarts,
>> then:
>>
>> 1. Does the job have the same id ?
>> 2. Does the automatically restarting application, pickup from the last
>> checkpoint ? I am assuming it does but just want to confirm.
>>
>> Also, if it is running on AWS EMR I believe EMR/Yarn is configured to
>> restart the job 3 times (after it has exhausted it's restart policy) .  If
>> that is the case:
>> 1. Does the job get a new id ? I believe it does, but just want to
>> confirm.
>> 2. Does the Yarn restart honor the last checkpoint ?  I believe, it does
>> not, but is there a way to make it restart from the last checkpoint of the
>> failed job (after it has exhausted its restart policy) ?
>>
>> Thanks
>>
>>
>>


Re: Apache Flink - Question about application restart

2020-05-24 Thread Zhu Zhu
Hi M,

Regarding your questions:
1. yes. The id is fixed once the job graph is generated.
2. yes

Regarding yarn mode:
1. the job id keeps the same because the job graph will be generated once
at client side and persist in DFS for reuse
2. yes if high availability is enabled

Thanks,
Zhu Zhu

M Singh  于2020年5月23日周六 上午4:06写道:

> Hi Flink Folks:
>
> If I have a Flink Application with 10 restarts, if it fails and restarts,
> then:
>
> 1. Does the job have the same id ?
> 2. Does the automatically restarting application, pickup from the last
> checkpoint ? I am assuming it does but just want to confirm.
>
> Also, if it is running on AWS EMR I believe EMR/Yarn is configured to
> restart the job 3 times (after it has exhausted it's restart policy) .  If
> that is the case:
> 1. Does the job get a new id ? I believe it does, but just want to confirm.
> 2. Does the Yarn restart honor the last checkpoint ?  I believe, it does
> not, but is there a way to make it restart from the last checkpoint of the
> failed job (after it has exhausted its restart policy) ?
>
> Thanks
>
>
>


Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-17 Thread Zhu Zhu
Thanks Yu for being the release manager. Thanks everyone who made this
release possible!

Thanks,
Zhu Zhu

Benchao Li  于2020年5月15日周五 下午7:51写道:

> Thanks Yu for the great work, and everyone else who made this possible.
>
> Dian Fu  于2020年5月15日周五 下午6:55写道:
>
>> Thanks Yu for managing this release and everyone else who made this
>> release possible. Good work!
>>
>> Regards,
>> Dian
>>
>> 在 2020年5月15日,下午6:26,Till Rohrmann  写道:
>>
>> Thanks Yu for being our release manager and everyone else who made the
>> release possible!
>>
>> Cheers,
>> Till
>>
>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu 
>> wrote:
>>
>>> Thanks a lot for the release and your great job, Yu!
>>> Also thanks to everyone who made this release possible!
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Yu Li  于2020年5月14日周四 上午1:59写道:
>>>
>>>> The Apache Flink community is very happy to announce the release of
>>>> Apache Flink 1.10.1, which is the first bugfix release for the Apache Flink
>>>> 1.10 series.
>>>>
>>>> Apache Flink® is an open-source stream processing framework for
>>>> distributed, high-performing, always-available, and accurate data streaming
>>>> applications.
>>>>
>>>> The release is available for download at:
>>>> https://flink.apache.org/downloads.html
>>>>
>>>> Please check out the release blog post for an overview of the
>>>> improvements for this bugfix release:
>>>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html
>>>>
>>>> The full release notes are available in Jira:
>>>>
>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
>>>>
>>>> We would like to thank all contributors of the Apache Flink community
>>>> who made this release possible!
>>>>
>>>> Regards,
>>>> Yu
>>>>
>>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-17 Thread Zhu Zhu
Thanks Yu for being the release manager. Thanks everyone who made this
release possible!

Thanks,
Zhu Zhu

Benchao Li  于2020年5月15日周五 下午7:51写道:

> Thanks Yu for the great work, and everyone else who made this possible.
>
> Dian Fu  于2020年5月15日周五 下午6:55写道:
>
>> Thanks Yu for managing this release and everyone else who made this
>> release possible. Good work!
>>
>> Regards,
>> Dian
>>
>> 在 2020年5月15日,下午6:26,Till Rohrmann  写道:
>>
>> Thanks Yu for being our release manager and everyone else who made the
>> release possible!
>>
>> Cheers,
>> Till
>>
>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu 
>> wrote:
>>
>>> Thanks a lot for the release and your great job, Yu!
>>> Also thanks to everyone who made this release possible!
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Yu Li  于2020年5月14日周四 上午1:59写道:
>>>
>>>> The Apache Flink community is very happy to announce the release of
>>>> Apache Flink 1.10.1, which is the first bugfix release for the Apache Flink
>>>> 1.10 series.
>>>>
>>>> Apache Flink® is an open-source stream processing framework for
>>>> distributed, high-performing, always-available, and accurate data streaming
>>>> applications.
>>>>
>>>> The release is available for download at:
>>>> https://flink.apache.org/downloads.html
>>>>
>>>> Please check out the release blog post for an overview of the
>>>> improvements for this bugfix release:
>>>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html
>>>>
>>>> The full release notes are available in Jira:
>>>>
>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891
>>>>
>>>> We would like to thank all contributors of the Apache Flink community
>>>> who made this release possible!
>>>>
>>>> Regards,
>>>> Yu
>>>>
>>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: Flink restart strategy on specific exception

2020-05-14 Thread Zhu Zhu
Ticket FLINK-17714 is created to track this requirement.

Thanks,
Zhu Zhu

Till Rohrmann  于2020年5月13日周三 下午8:30写道:

> Yes, you are right Zhu Zhu. Extending
> the RestartBackoffTimeStrategyFactoryLoader to also load custom
> RestartBackoffTimeStrategies sound like a good improvement for the future.
>
> @Ken Krugler , the old RestartStrategy
> interface did not provide the cause of the failure, unfortunately.
>
> Cheers,
> Till
>
> On Wed, May 13, 2020 at 7:55 AM Zhu Zhu  wrote:
>
>> Hi Ken,
>>
>> Custom restart-strategy was an experimental feature and was deprecated
>> since 1.10. [1]
>> That's why you cannot find any documentation for it.
>>
>> The old RestartStrategy was deprecated and replaced by
>> RestartBackoffTimeStrategy since 1.10
>> (unless you are using the legacy scheduler which was also deprecated).
>> The new restart strategy, RestartBackoffTimeStrategy, will be able to
>> know the exact failure cause.
>> However, the new restart strategy does not support customization at the
>> moment.
>> Your requirement sounds reasonable to me and I think custom (new) restart
>> strategy can be something to support later.
>>
>> @Till Rohrmann  @Gary Yao  what
>> do you think?
>>
>> [1]
>> https://lists.apache.org/thread.html/6ed95eb6a91168dba09901e158bc1b6f4b08f1e176db4641f79de765%40%3Cdev.flink.apache.org%3E
>>
>> Thanks,
>> Zhu Zhu
>>
>> Ken Krugler  于2020年5月13日周三 上午7:34写道:
>>
>>> Hi Til,
>>>
>>> Sorry, missed the key question…in the RestartStrategy.restart() method,
>>> I don’t see any good way to get at the underlying exception.
>>>
>>> I can cast the RestartCallback to an ExecutionGraphRestartCallback, but
>>> I still need access to the private execGraph to be able to get at the
>>> failure info. Is there some other way in the restart handler to get at this?
>>>
>>> And yes, I meant to note you’d mentioned the required static method in
>>> your email, I was asking about documentation for it.
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>> ===
>>> Sorry to resurface an ancient question, but is there a working example
>>> anywhere of setting a custom restart strategy?
>>>
>>> Asking because I’ve been wandering through the Flink 1.9 code base for a
>>> while, and the restart strategy implementation is…pretty tangled.
>>>
>>> From what I’ve been able to figure out, you have to provide a factory
>>> class, something like this:
>>>
>>> Configuration config = new Configuration();
>>> config.setString(ConfigConstants.RESTART_STRATEGY,
>>> MyRestartStrategyFactory.class.getCanonicalName());
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.createLocalEnvironment(4, config);
>>>
>>> That factory class should extend RestartStrategyFactory, but it also
>>> needs to implement a static method that looks like:
>>>
>>> public static MyRestartStrategyFactory
>>> createFactory(Configuration config) {
>>> return new MyRestartStrategyFactory();
>>> }
>>>
>>> I wasn’t able to find any documentation that mentioned this particular
>>> method being a requirement.
>>>
>>> And also the documentation at
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
>>>  doesn’t
>>> mention you can set a custom class name for the restart-strategy.
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>>
>>> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  wrote:
>>>
>>> Hi Kasif,
>>>
>>> I think in this situation it is best if you defined your own custom
>>> RestartStrategy by specifying a class which has a `RestartStrategyFactory
>>> createFactory(Configuration configuration)` method as `restart-strategy:
>>> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  wrote:
>>>
>>>> Hello,
>>>>
>>>>
>>>>
>>>> Looking at existing restart strategies they are kind of generic. We
>>>> have a requirement to restart the job only in case of specific
>>>> exception/issues.
>>>>
>>>> What would be the best way to ha

Re: Flink restart strategy on specific exception

2020-05-12 Thread Zhu Zhu
Hi Ken,

Custom restart-strategy was an experimental feature and was deprecated
since 1.10. [1]
That's why you cannot find any documentation for it.

The old RestartStrategy was deprecated and replaced by
RestartBackoffTimeStrategy since 1.10
(unless you are using the legacy scheduler which was also deprecated).
The new restart strategy, RestartBackoffTimeStrategy, will be able to know
the exact failure cause.
However, the new restart strategy does not support customization at the
moment.
Your requirement sounds reasonable to me and I think custom (new) restart
strategy can be something to support later.

@Till Rohrmann  @Gary Yao  what do
you think?

[1]
https://lists.apache.org/thread.html/6ed95eb6a91168dba09901e158bc1b6f4b08f1e176db4641f79de765%40%3Cdev.flink.apache.org%3E

Thanks,
Zhu Zhu

Ken Krugler  于2020年5月13日周三 上午7:34写道:

> Hi Til,
>
> Sorry, missed the key question…in the RestartStrategy.restart() method, I
> don’t see any good way to get at the underlying exception.
>
> I can cast the RestartCallback to an ExecutionGraphRestartCallback, but I
> still need access to the private execGraph to be able to get at the failure
> info. Is there some other way in the restart handler to get at this?
>
> And yes, I meant to note you’d mentioned the required static method in
> your email, I was asking about documentation for it.
>
> Thanks,
>
> — Ken
>
> ===
> Sorry to resurface an ancient question, but is there a working example
> anywhere of setting a custom restart strategy?
>
> Asking because I’ve been wandering through the Flink 1.9 code base for a
> while, and the restart strategy implementation is…pretty tangled.
>
> From what I’ve been able to figure out, you have to provide a factory
> class, something like this:
>
> Configuration config = new Configuration();
> config.setString(ConfigConstants.RESTART_STRATEGY,
> MyRestartStrategyFactory.class.getCanonicalName());
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironment(4, config);
>
> That factory class should extend RestartStrategyFactory, but it also needs
> to implement a static method that looks like:
>
> public static MyRestartStrategyFactory
> createFactory(Configuration config) {
> return new MyRestartStrategyFactory();
> }
>
> I wasn’t able to find any documentation that mentioned this particular
> method being a requirement.
>
> And also the documentation at
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#fault-tolerance
>  doesn’t
> mention you can set a custom class name for the restart-strategy.
>
> Thanks,
>
> — Ken
>
>
> On Nov 22, 2018, at 8:18 AM, Till Rohrmann  wrote:
>
> Hi Kasif,
>
> I think in this situation it is best if you defined your own custom
> RestartStrategy by specifying a class which has a `RestartStrategyFactory
> createFactory(Configuration configuration)` method as `restart-strategy:
> MyRestartStrategyFactoryFactory` in `flink-conf.yaml`.
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 7:18 AM Ali, Kasif  wrote:
>
>> Hello,
>>
>>
>>
>> Looking at existing restart strategies they are kind of generic. We have
>> a requirement to restart the job only in case of specific exception/issues.
>>
>> What would be the best way to have a re start strategy which is based on
>> few rules like looking at particular type of exception or some extra
>> condition checks which are application specific.?
>>
>>
>>
>> Just a background on one specific issue which invoked this requirement is
>> slots not getting released when the job finishes. In our applications, we
>> keep track of jobs submitted with the amount of parallelism allotted to
>> it.  Once the job finishes we assume that the slots are free and try to
>> submit next set of jobs which at times fail with error  “not enough slots
>> available”.
>>
>>
>>
>> So we think a job re start can solve this issue but we only want to re
>> start only if this particular situation is encountered.
>>
>>
>>
>> Please let us know If there are better ways to solve this problem other
>> than re start strategy.
>>
>>
>>
>> Thanks,
>>
>> Kasif
>>
>>
>>
>> --
>>
>> Your Personal Data: We may collect and process information about you that
>> may be subject to data protection laws. For more information about how we
>> use and disclose your personal data, how we protect your information, our
>> legal basis to use your information, your rights and who you can contact,
>> please refer to: www.gs.com/privacy-notices
>>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


Re: Flink 1.10.0 failover

2020-04-26 Thread Zhu Zhu
Seems something bad happened in the task managers and led to
heartbeat timeouts.
These TMs were not released by flink but lost connections with the master
node.
I think you need to check the TM log to see what happens there.

Thanks,
Zhu Zhu

seeksst  于2020年4月26日周日 下午2:13写道:

> Thank you for your reply.
>
>
> I forget providing some information. I use 'run -m yarn-cluster’ to start
> my job, which means ‘run a single flink job on yarn’. after one minute, the
> job throw exception: java.util.concurrent.TimeoutException: The heartbeat
> of TaskManager with id container_1581388570291_0133_01_03 timed out.
>
>
> First, the job start with two taskManager:
> org.apache.flink.yarn.YarnResourceManager - Registering TaskManager with
> ResourceID container_1581388570291_0133_01_03 (xxx.xxx.xxx.xxx:38211)
> org.apache.flink.yarn.YarnResourceManager - Registering TaskManager with
> ResourceID container_1581388570291_0133_01_02 (xxx.xxx.xxx.xxx:33715)
>
> Then, 003 timeout, and throw with exception:
> org.apache.flink.yarn.YarnResourceManager - The heartbeat of TaskManager
> with id container_1581388570291_0133_01_03 timed out.
> org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor
> connection container_1581388570291_0133_01_03 because: The heartbeat of
> TaskManager with id container_1581388570291_0133_01_03 timed out.
>
>
> Switch RUNNING TO CANCELING, Swith CANCELING To CANCELED.
>
> After 10 Seconds(I used fixedDelayRestart), Switch Restarting TO RUNNING.
>
> switched from CREATED to SCHEDULED.
>
> Requesting new slot [SlotRequestId{c6c137acf7ef9fd639157f0e9495fe42}] and
> profile ResourceProfile{UNKNOWN} from resource manager.
> Requesting new TaskExecutor container with resources  vCores:3>. Number pending requests 1.
> Requesting new TaskExecutor container with resources  vCores:3>. Number pending requests 2.
>
> The heartbeat of TaskManager with id
> container_1581388570291_0133_01_02 timed out.
> Closing TaskExecutor connection container_1581388570291_0133_01_02
> because: The heartbeat of TaskManager with id
> container_1581388570291_0133_01_02 timed out.
>
>
> org.apache.flink.yarn.YarnResourceManager - Received 1 containers with 2
> pending container requests.
> org.apache.flink.yarn.YarnResourceManager - Removing container request
> Capability[]Priority[1]. Pending container requests
> 1.
> org.apache.flink.yarn.YarnResourceManager - TaskExecutor
> container_1581388570291_0133_01_04 will be started
> org.apache.flink.yarn.YarnResourceManager - Registering TaskManager with
> ResourceID container_1581388570291_0133_01_04 (xxx.xxx.xxx.xxx:40463)
> akka.remote.transport.netty.NettyTransport - Remote connection to [null]
> failed with java.net.ConnectException: Connection refused:xxx.xxx.xxx.xxx:
> 33715
>
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources.
>
> And it restart again. switched from SCHEDULED to CANCELING. switched from
> CANCELING to CANCELED.
> 10 Seconds later, switched from CREATED to SCHEDULED.
> akka.remote.transport.netty.NettyTransport - Remote connection to [null]
> failed with java.net.ConnectException: Connection refused:
> (xxx.xxx.xxx.xxx:33715)
>
> the port 33715 is container_1581388570291_0133_01_02, it was closed
> already.
> then
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources.
>
>
> 10 seconds later, the third times restart, it only report
> NoResourceAvailableException, and nothing about 33715.
>
> Now, the job only have one task manager 004, but yarn resource has nothing
> left. last email has no task manager and no resource.
>
> I don’t know what make this happen. there is enough resource if all old
> taskmanager was released,
> sometimes the job can create one, sometimes none. this never happen on
> 1.8.2, i use same cluster and job, just different flink version.
> the job may fail and auto-recovery. but in 1.10.0, it seems yarn miss some
> taskmanager fail, and not release resource, so the new one
> can’t be created.
>
> What’s more should i do?
> Thanks a lot.
>  原始邮件
> *发件人:* Zhu Zhu
> *收件人:* seeksst
> *抄送:* user
> *发送时间:* 2020年4月26日(周日) 11:52
> *主题:* Re: Flink 1.10.0 failover
>
> Sorry I did not quite understand the problem.
> Do you mean a failed job does not release resources to yarn?
>  - if so, is the job in restarting process? A job in recovery will reuse
> the slots so they will not be rel

Re: Flink 1.10.0 failover

2020-04-25 Thread Zhu Zhu
Sorry I did not quite understand the problem.
Do you mean a failed job does not release resources to yarn?
 - if so, is the job in restarting process? A job in recovery will reuse
the slots so they will not be release.
Or a failed job cannot acquire slots when it is restarted in auto-recovery?
- if so, normally the job should be in a loop like (restarting tasks ->
allocating slots -> failed due to not be able to acquire enough slots ->
restarting task -> ...). Would you check whether the job is in such a loop?
Or the job cannot allocate enough slots even if the cluster has enough
resource?

Thanks,
Zhu Zhu



seeksst  于2020年4月26日周日 上午11:21写道:

> Hi,
>
>
> Recently, I find a problem when job failed in 1.10.0, flink didn’t
> release resource first.
>
>
>
>  You can see I used flink on yarn, and it doesn’t allocate task
> manager, beacause no more memory left.
>
>  If i cancel the job, the cluster has more memory.
>
>  In 1.8.2, the job will restart normally, is this a bug?
>
>  Thanks.
>


Re: Flink job didn't restart when a task failed

2020-04-15 Thread Zhu Zhu
Sorry I made a mistake. Even if it's the case I had guessed, you will not
get a log "Task {} is already in state FAILED." because that task was
already unregistered before trying to update the state to JM. Unfortunately
currently we have no log which can be used to prove it.
Just to confirm that the line "FOG_PREDICTION_FUNCTION (15/20) (
3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED" does not
appear in the JM log, right? This might be an issue that the message was
lost on network, which should be a rare case. Do you encounter it often?

Thanks,
Zhu Zhu

Hanson, Bruce  于2020年4月15日周三 上午9:16写道:

> Hi Zhu Zhu (and Till),
>
>
>
> Thanks for your thoughts on this problem. I do not see a message like the
> one you mention "Task {} is already in state FAILED." I have attached a
> file with all the task manager logs that we received at the time this
> happened. As you see, there aren’t many. We turned on debug logging for
> “org.apache.flink” on this job this afternoon so maybe we’ll find something
> interesting if/when the issue happens again. I do hope we can catch it in
> the act.
>
>
>
> -Bruce
>
>
>
> --
>
>
>
>
>
> *From: *Zhu Zhu 
> *Date: *Monday, April 13, 2020 at 9:29 PM
> *To: *Till Rohrmann 
> *Cc: *Aljoscha Krettek , user ,
> Gary Yao 
> *Subject: *Re: Flink job didn't restart when a task failed
>
>
>
> Sorry for not following this ML earlier.
>
>
>
> I think the cause might be that the final state ('FAILED') update message
> to JM is lost. TaskExecutor will simply fail the task (which does not take
> effect in this case since the task is already FAILED) and will not update
> the task state again in this case.
>
> @Bruce would you take a look at the TM log? If the guess is right, in task
> manager logs there will be one line "Task {} is already in state FAILED."
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
> Till Rohrmann  于2020年4月10日周五 上午12:51写道:
>
> For future reference, here is the issue to track the reconciliation logic
> [1].
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-17075
> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17075=01%7C01%7C%7Cfde409363b6546036cba08d7e02c5c1a%7C6d4034cd72254f72b85391feaea64919%7C1=6nawlMBMgJftUqvFQJgPov1k%2B03DtkprV%2FnUfCpAm9M%3D=0>
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann  wrote:
>
> Hi Bruce,
>
>
>
> what you are describing sounds indeed quite bad. Quite hard to say whether
> we fixed such an issue in 1.10. It is definitely worth a try to upgrade,
> though.
>
>
>
> In order to further debug the problem, it would be really great if you
> could provide us with the log files of the JobMaster and the TaskExecutor.
> Ideally on debug log level if you have them.
>
>
>
> One thing which we wanted to add is sending the current task statuses as
> part of the heartbeat from the TM to the JM. Having this information would
> allow us to reconcile a situation like you are describing.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek 
> wrote:
>
> Hi,
>
> this indeed seems very strange!
>
> @Gary Could you maybe have a look at this since you work/worked quite a
> bit on the scheduler?
>
> Best,
> Aljoscha
>
> On 09.04.20 05:46, Hanson, Bruce wrote:
> > Hello Flink folks:
> >
> > We had a problem with a Flink job the other day that I haven’t seen
> before. One task encountered a failure and switched to FAILED (see the full
> exception below). After the failure, the task said it was notifying the Job
> Manager:
> >
> > 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283]
> level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor -
> Un-registering task and sending final execution state FAILED to JobManager
> for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
> >
> > But I see no evidence that the Job Manager got the message. I would
> expect with this type of failure that the Job Manager would restart the
> job. In this case, the job carried on, hobbled, until the it stopped
> processing data and our user had to manually restart the job. The job also
> started experiencing checkpoint timeouts on every checkpoint due to this
> operator stopping.
> >
> > Had the job restarted when this happened, I believe everything would
> have been ok as the job had an appropriate restart strategy in place. The
> Task Manager that this task was running on remained healthy and was
> actively processing o

Re: Flink job didn't restart when a task failed

2020-04-13 Thread Zhu Zhu
Sorry for not following this ML earlier.

I think the cause might be that the final state ('FAILED') update message
to JM is lost. TaskExecutor will simply fail the task (which does not take
effect in this case since the task is already FAILED) and will not update
the task state again in this case.
@Bruce would you take a look at the TM log? If the guess is right, in task
manager logs there will be one line "Task {} is already in state FAILED."

Thanks,
Zhu Zhu

Till Rohrmann  于2020年4月10日周五 上午12:51写道:

> For future reference, here is the issue to track the reconciliation logic
> [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-17075
>
> Cheers,
> Till
>
> On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann  wrote:
>
>> Hi Bruce,
>>
>> what you are describing sounds indeed quite bad. Quite hard to say
>> whether we fixed such an issue in 1.10. It is definitely worth a try to
>> upgrade, though.
>>
>> In order to further debug the problem, it would be really great if you
>> could provide us with the log files of the JobMaster and the TaskExecutor.
>> Ideally on debug log level if you have them.
>>
>> One thing which we wanted to add is sending the current task statuses as
>> part of the heartbeat from the TM to the JM. Having this information would
>> allow us to reconcile a situation like you are describing.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> this indeed seems very strange!
>>>
>>> @Gary Could you maybe have a look at this since you work/worked quite a
>>> bit on the scheduler?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 09.04.20 05:46, Hanson, Bruce wrote:
>>> > Hello Flink folks:
>>> >
>>> > We had a problem with a Flink job the other day that I haven’t seen
>>> before. One task encountered a failure and switched to FAILED (see the full
>>> exception below). After the failure, the task said it was notifying the Job
>>> Manager:
>>> >
>>> > 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283]
>>> level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor -
>>> Un-registering task and sending final execution state FAILED to JobManager
>>> for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>>> >
>>> > But I see no evidence that the Job Manager got the message. I would
>>> expect with this type of failure that the Job Manager would restart the
>>> job. In this case, the job carried on, hobbled, until the it stopped
>>> processing data and our user had to manually restart the job. The job also
>>> started experiencing checkpoint timeouts on every checkpoint due to this
>>> operator stopping.
>>> >
>>> > Had the job restarted when this happened, I believe everything would
>>> have been ok as the job had an appropriate restart strategy in place. The
>>> Task Manager that this task was running on remained healthy and was
>>> actively processing other tasks.
>>> >
>>> > It seems like this is some kind of a bug. Is this something anyone has
>>> seen before? Could it be something that has been fixed if we went to Flink
>>> 1.10?
>>> >
>>> > We are running Flink 1.7.2. I know it’s rather old now. We run a
>>> managed environment where users can run their jobs, and are in the process
>>> of upgrading to 1.10.
>>> >
>>> > This is the full exception that started the problem:
>>> >
>>> > 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO
>>> org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION
>>> (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
>>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> Connection timed out (connection to '/100.112.98.121:36256')
>>> > at org.apache.flink.runtime.io
>>> .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>>> > at
>>> org.apache.flink.shade

Re: Issue with Could not resolve ResourceManager address akka.tcp://flink

2020-03-26 Thread Zhu Zhu
Hi Vitaliy,

>> *Cannot serve slot request, no ResourceManager connected*
This is not a problem, just that the JM needs RM to be connected to send
slot requests.

>> *Could not resolve ResourceManager address
akka.tcp://flink@prod-bigd-dn11:43757/user/resourcemanager*
This should be the root cause. Would you check whether the hostname
*prod-bigd-dn11* is resolvable? And whether the port 43757 of that machine
is permitted to be accessed?

Thanks,
Zhu Zhu

Vitaliy Semochkin  于2020年3月27日周五 上午1:54写道:

> Hi,
>
> I'm facing an issue similar to
> https://issues.apache.org/jira/browse/FLINK-14074
> Job starts and then yarn logs report "*Could not resolve ResourceManager
> address akka.tcp://flink*"
>
> A fragment from yarn logs looks like this:
>
> LazyFromSourcesSchedulingStrategy]
> 16:54:21,279 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- Job Flink Java Job at Thu Mar 26 16:54:09 CET 2020
> (9817283f911d83a6d278cc39d17d6b11) switched from state CREATED to RUNNING.
> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
> data set from EMC events) (1/3) (5482b0e6ae1d64d9b0918ec15599211f) switched
> from CREATED to SCHEDULED.
> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
> data set from EMC events) (2/3) (5c993710423eea47ae66f833b2999530) switched
> from CREATED to SCHEDULED.
> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMC; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMC event) -> Map (Map IntraregionalVolumeItem
> data set from EMC events) (3/3) (23cfa30fba857b2c75ba76a21c7d4972) switched
> from CREATED to SCHEDULED.
> 16:54:21,287 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
> data set from EMD events) (1/3) (7cc8a395b87e82000184724eb1698ace) switched
> from CREATED to SCHEDULED.
> 16:54:21,288 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
> data set from EMD events) (2/3) (5edfe3d1f509856d17fa0da078cb3f7e) switched
> from CREATED to SCHEDULED.
> 16:54:21,288 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>- CHAIN DataSource (MailEvent; EMD; 2019-12-01 - 2020-01-01; null -
> 157818240) -> FlatMap (SplitDuplicate) -> FlatMap (Create MailEvent) ->
> Filter (EventDateTimeRangeFilter) -> Filter (TrackingStatusesFilter) ->
> FlatMap (Get mail item by EMD event) -> Map (Map IntraregionalVolumeItem
> data set from EMD events) (3/3) (dd3397f889a3fad1acf4c59f59a93d92) switched
> from CREATED to SCHEDULED.
> 16:54:21,297 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
> serve slot request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{b4c6e7357e4620bf2e997c46d7723eb1}]
> 16:54:21,301 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
> serve slot request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{841bbb79b01b5e0d9ae749a03f65c303}]
> 16:54:21,301 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  - Cannot
> serve slot request, no ResourceManager connected. Adding as pending request
> [SlotRequestId{496120465d541ea9fd2ffcec89e2ac3b}]
> 16:54:21,304 INFO  org.apache.flink.runtime.jobmaster.JobMas

Re: How to test flink job recover from checkpoint

2020-03-04 Thread Zhu Zhu
Hi Eleanore,

You can change your application tasks to throw exceptions in a certain
frequency.
Alternatively, if the application has external dependencies (e.g. source),
you can trigger failures manually by manipulating the status of the
external service (e.g. shutdown the source service, or break the network
connection between the Flink app and the source service).

Thanks,
Zhu Zhu

Eleanore Jin  于2020年3月5日周四 上午8:40写道:

> Hi,
>
> I have a flink application and checkpoint is enabled, I am running locally
> using miniCluster.
>
> I just wonder if there is a way to simulate the failure, and verify that
> flink job restarts from checkpoint?
>
> Thanks a lot!
> Eleanore
>


Re: How to test flink job recover from checkpoint

2020-03-04 Thread Zhu Zhu
Hi Eleanore,

You can change your application tasks to throw exceptions in a certain
frequency.
Alternatively, if the application has external dependencies (e.g. source),
you can trigger failures manually by manipulating the status of the
external service (e.g. shutdown the source service, or break the network
connection between the Flink app and the source service).

Thanks,
Zhu Zhu

Eleanore Jin  于2020年3月5日周四 上午8:40写道:

> Hi,
>
> I have a flink application and checkpoint is enabled, I am running locally
> using miniCluster.
>
> I just wonder if there is a way to simulate the failure, and verify that
> flink job restarts from checkpoint?
>
> Thanks a lot!
> Eleanore
>


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-23 Thread Zhu Zhu
Congratulations Jingsong!

Thanks,
Zhu Zhu

Fabian Hueske  于2020年2月22日周六 上午1:30写道:

> Congrats Jingsong!
>
> Cheers, Fabian
>
> Am Fr., 21. Feb. 2020 um 17:49 Uhr schrieb Rong Rong  >:
>
> > Congratulations Jingsong!!
> >
> > Cheers,
> > Rong
> >
> > On Fri, Feb 21, 2020 at 8:45 AM Bowen Li  wrote:
> >
> > > Congrats, Jingsong!
> > >
> > > On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann 
> > > wrote:
> > >
> > >> Congratulations Jingsong!
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
> > >>
> > >>>   Congratulations Jingsong!
> > >>>
> > >>>Best,
> > >>>Yun
> > >>>
> > >>> --
> > >>> From:Jingsong Li 
> > >>> Send Time:2020 Feb. 21 (Fri.) 21:42
> > >>> To:Hequn Cheng 
> > >>> Cc:Yang Wang ; Zhijiang <
> > >>> wangzhijiang...@aliyun.com>; Zhenghua Gao ;
> godfrey
> > >>> he ; dev ; user <
> > >>> user@flink.apache.org>
> > >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
> > >>>
> > >>> Thanks everyone~
> > >>>
> > >>> It's my pleasure to be part of the community. I hope I can make a
> > better
> > >>> contribution in future.
> > >>>
> > >>> Best,
> > >>> Jingsong Lee
> > >>>
> > >>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng 
> wrote:
> > >>> Congratulations Jingsong! Well deserved.
> > >>>
> > >>> Best,
> > >>> Hequn
> > >>>
> > >>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang 
> > wrote:
> > >>> Congratulations!Jingsong. Well deserved.
> > >>>
> > >>>
> > >>> Best,
> > >>> Yang
> > >>>
> > >>> Zhijiang  于2020年2月21日周五 下午1:18写道:
> > >>> Congrats Jingsong! Welcome on board!
> > >>>
> > >>> Best,
> > >>> Zhijiang
> > >>>
> > >>> --
> > >>> From:Zhenghua Gao 
> > >>> Send Time:2020 Feb. 21 (Fri.) 12:49
> > >>> To:godfrey he 
> > >>> Cc:dev ; user 
> > >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
> > >>>
> > >>> Congrats Jingsong!
> > >>>
> > >>>
> > >>> *Best Regards,*
> > >>> *Zhenghua Gao*
> > >>>
> > >>>
> > >>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he 
> > wrote:
> > >>> Congrats Jingsong! Well deserved.
> > >>>
> > >>> Best,
> > >>> godfrey
> > >>>
> > >>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
> > >>> Congratulations!Jingsong. You deserve it
> > >>>
> > >>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
> > >>> Congrats Jingsong!
> > >>>
> > >>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
> > >>>
> > >>> > Congrats Jingsong!
> > >>> >
> > >>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
> > >>> > >
> > >>> > > Congratulations Jingsong! Well deserved.
> > >>> > >
> > >>> > > Best,
> > >>> > > Jark
> > >>> > >
> > >>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
> > >>> > >
> > >>> > >> Congratulations! Jingsong
> > >>> > >>
> > >>> > >>
> > >>> > >> Best,
> > >>> > >> Dan Zou
> > >>> > >>
> > >>> >
> > >>> >
> > >>>
> > >>>
> > >>> --
> > >>> Best Regards
> > >>>
> > >>> Jeff Zhang
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> Best, Jingsong Lee
> > >>>
> > >>>
> > >>>
> >
>


Re: TaskManager Fail when I cancel the job and crash

2020-02-16 Thread Zhu Zhu
Hi Soheil,

I think the root cause is that in the cancellation, the task was stuck in

*org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)*


The taskmanager process exit is expected in this case to enforce a failure
and recovery.
To be specific, when a task on the TM is to be canceled, a
*TaskCancelerWatchDog* will be started to watch the cancellation.
If the cancellation timed out, the watchdog would trigger a fatal error to
force the TM to exit.

I think you may need to diagnostic why the postgresql call took so long to
flush data.
Alternatively, if the long flushing time cost is expected, one can increase
the cancellation timeout ("task.cancellation.timeout") to avoid this issue.

Thanks,
Zhu Zhu

Soheil Pourbafrani  于2020年2月15日周六 上午6:41写道:

> Hi,
>
> I developed a single Flink job that read a huge amount of files and after
> some simple preprocessing, sink them into the database. I use the built-in
> JDBCOutputFormat for inserting records into the database. The problem is
> when I cancel the job using either the WebUI or the command line, the job
> did not cancel completely and finally, the taskmanager process crashes!
> Here are the taskmanager logs (generated continuously for some seconds):
>
> 2020-02-15 01:17:17,208 WARN
>>  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
>>  - The reader is stuck in method:
>>  java.lang.Object.wait(Native Method)
>> org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999)
>> org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856)
>>
>> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210)
>>
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41)
>>
>> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86)
>>
>> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>>
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>> 2020-02-15 01:17:17,224 INFO
>>  akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
>> shut down.
>> 2020-02-15 01:17:17,225 INFO
>>  akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting
>> shut down.
>
>
> I'm using the
> Flink: 1.7.2,
> java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
>
> Any help will be appreciated.
>
> All the best,
> Soheil
>


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Zhu Zhu
Cheers!
Thanks Gary and Yu for the great job as release managers.
And thanks to everyone whose contribution makes the release possible!

Thanks,
Zhu Zhu

Wyatt Chun  于2020年2月12日周三 下午9:36写道:

> Sounds great. Congrats & Thanks!
>
> On Wed, Feb 12, 2020 at 9:31 PM Yu Li  wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.10.0, which is the latest major release.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this new major release:
>> https://flink.apache.org/news/2020/02/11/release-1.10.0.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Cheers,
>> Gary & Yu
>>
>


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Zhu Zhu
Congratulations Dian.

Thanks,
Zhu Zhu

hailongwang <18868816...@163.com> 于2020年1月17日周五 上午10:01写道:

>
> Congratulations Dian !
>
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-01-16 21:15:34,"Congxian Qiu"  写道:
>
> Congratulations Dian Fu
>
> Best,
> Congxian
>
>
> Jark Wu  于2020年1月16日周四 下午7:44写道:
>
>> Congratulations Dian and welcome on board!
>>
>> Best,
>> Jark
>>
>> On Thu, 16 Jan 2020 at 19:32, Jingsong Li  wrote:
>>
>> > Congratulations Dian Fu. Well deserved!
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Thu, Jan 16, 2020 at 6:26 PM jincheng sun 
>> > wrote:
>> >
>> >> Congrats Dian Fu and welcome on board!
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> Shuo Cheng  于2020年1月16日周四 下午6:22写道:
>> >>
>> >>> Congratulations!  Dian Fu
>> >>>
>> >>> > Xingbo Wei Zhong  于2020年1月16日周四 下午6:13写道:  jincheng sun
>> >>> 于2020年1月16日周四 下午5:58写道:
>> >>>
>> >>
>> >
>> > --
>> > Best, Jingsong Lee
>> >
>>
>
>
>
>
>


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Zhu Zhu
Congratulations Dian.

Thanks,
Zhu Zhu

hailongwang <18868816...@163.com> 于2020年1月17日周五 上午10:01写道:

>
> Congratulations Dian !
>
> Best,
> Hailong Wang
>
>
>
>
> 在 2020-01-16 21:15:34,"Congxian Qiu"  写道:
>
> Congratulations Dian Fu
>
> Best,
> Congxian
>
>
> Jark Wu  于2020年1月16日周四 下午7:44写道:
>
>> Congratulations Dian and welcome on board!
>>
>> Best,
>> Jark
>>
>> On Thu, 16 Jan 2020 at 19:32, Jingsong Li  wrote:
>>
>> > Congratulations Dian Fu. Well deserved!
>> >
>> > Best,
>> > Jingsong Lee
>> >
>> > On Thu, Jan 16, 2020 at 6:26 PM jincheng sun 
>> > wrote:
>> >
>> >> Congrats Dian Fu and welcome on board!
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> Shuo Cheng  于2020年1月16日周四 下午6:22写道:
>> >>
>> >>> Congratulations!  Dian Fu
>> >>>
>> >>> > Xingbo Wei Zhong  于2020年1月16日周四 下午6:13写道:  jincheng sun
>> >>> 于2020年1月16日周四 下午5:58写道:
>> >>>
>> >>
>> >
>> > --
>> > Best, Jingsong Lee
>> >
>>
>
>
>
>
>


Re: How to assign a UID to a KeyedStream?

2020-01-09 Thread Zhu Zhu
Hi Ken,

This is actually a bug that a Partition should not require a UID. It is
fixed in 1.9.2 and 1.10. see FLINK-14910
<https://jira.apache.org/jira/browse/FLINK-14910>.

Thanks,
Zhu Zhu

Ken Krugler  于2020年1月10日周五 上午7:51写道:

> Hi all,
>
> [Of course, right after hitting send I realized I could just do
> rides.getTransformation().setUid(“blah”), ditto for the fares stream. Might
> be something to add to the docs, or provide a .uid() method on KeyedStreams
> for syntactic sugar]
>
> Just for grins, I disabled auto-generated UIDs for the taxi rides/fares
> state example in the online tutorial.
>
> env.getConfig().disableAutoGeneratedUIDs();
>
> I then added UIDs for all operators, sources & sinks. But I still get the
> following when calling env.getExecutionPlan() or env.execute():
>
> java.lang.IllegalStateException: Auto generated UIDs have been disabled
> but no UID or hash has been assigned to operator Partition
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
> at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)
>
> The simple workflow is:
>
> DataStream rides = env
> .addSource(new CheckpointedTaxiRideSource(ridesFile,
> servingSpeedFactor))
> .uid("source: taxi rides")
> .name("taxi rides")
> .filter((TaxiRide ride) -> ride.isStart)
> .uid("filter: only start rides")
> .name("only start rides")
> .keyBy((TaxiRide ride) -> ride.rideId);
>
> DataStream fares = env
> .addSource(new CheckpointedTaxiFareSource(faresFile,
> servingSpeedFactor))
> .uid("source: taxi fares")
> .name("taxi fares")
> .keyBy((TaxiFare fare) -> fare.rideId);
>
> DataStreamSink> enriched = rides
> .connect(fares)
> .flatMap(new EnrichmentFunction())
> .uid("function: enrich rides with fares")
> .name("enrich rides with fares")
> .addSink(sink)
> .uid("sink: enriched taxi rides")
> .name("enriched taxi rides");
>
> Internally the exception is thrown when the EnrichFunction (a
> RichCoFlatMapFunction) is being transformed by
> StreamGraphGenerator.transformTwoInputTransform().
>
> This calls StreamGraphGenerator.transform() with the two inputs, but the
> Transformation for each input is a PartitionTransformation.
>
> I don’t see a way to set the UID following the keyBy(), as a KeyedStream
> creates the PartitionTransformation without a UID.
>
> Any insight into setting the UID properly here? Or should
> StreamGraphGenerator.transform() skip the no-uid check for
> PartitionTransformation, since that’s not an operator with state?
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


Re: Flink Job claster scalability

2020-01-09 Thread Zhu Zhu
Hi KristoffSC,

Did you increase the parallelism of the vertex that has the largest
parallelism?
Or did you explicitly set tasks to be in different slot sharing group?
With the default slot sharing, the number of slots required/used equals to
the max parallelism of a JobVertex, which is 6 in your case.

KristoffSC  于2020年1月9日周四 下午9:26写道:

> Thank you David and Zhu Zhu,
> this helps a lot.
>
> I have follow up questions though.
>
> Having this
> /"Instead the Job must be stopped via a savepoint and restarted with a new
> parallelism"/
>
> and slot sharing [1] feature, I got the impression that if I would start my
> cluster with more than 6 task slots, Flink will try deploy tasks across all
> resources, trying to use all available resources during job submission
>
> I did a two tests having my original task.
> 1. I started a Job Cluster with 7 task slots (7 task manager since in this
> case 1 task manager has one task slot).
> 2. I started a Session cluster with 28 task slots in total. In this case I
> had 7 task managers, 4 task slot each.
>
> For case 1, I use "FLINK_JOB" variable as stated in [2]. For case 2, I
> submitted my job from UI after Flink started to be operative.
>
>
> For both cases it used only 6 task slots, so it was still reusing task
> slots. I got the impression that it will try to use as much available
> resources as it can.
>
> What do you think about this?
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
> [2]
>
> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink Job claster scalability

2020-01-08 Thread Zhu Zhu
Hi KristoffSC,

Each task needs a slot to run. However, Flink enables slot sharing[1] by
default so that one slot can host one parallel instance of each task in a
job. That's why your job can start with 6 slots.
However, different parallel instances of the same task cannot share a slot.
That's why you need at least 6 slots to run your job.

You can set tasks to be in different slot sharing group via
'.slotSharingGroup(xxx)' to force certain tasks to not share slots. This
allows the tasks to not burden each other. However, in this way the job
will need more slots to start.

So for your questions:
#1 yes
#2 ATM, you will need to resubmit your job with the adjusted parallelism.
The rescale cli was experimental and was temporarily removed [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html#task-slots-and-resources
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Temporarily-remove-support-for-job-rescaling-via-CLI-action-quot-modify-quot-td27447.html

Thanks,
Zhu Zhu

KristoffSC  于2020年1月9日周四 上午1:05写道:

> Hi all,
> I must say I'm very impressed by Flink and what it can do.
>
> I was trying to play around with Flink operator parallelism and scalability
> and I have few questions regarding this subject.
>
> My setup is:
> 1. Flink 1.9.1
> 2. Docker Job Cluster, where each Task manager has only one task slot. I'm
> following [1]
> 3. env setup:
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
> env.setParallelism(1);
> env.setMaxParallelism(128);
> env.enableCheckpointing(10 * 60 * 1000);
>
> Please mind that I am using operator chaining here.
>
> My pipeline setup:
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png>
>
>
>
> As you can see I have 7 operators (few of them were actually chained and
> this is ok), with different parallelism level. This all gives me 23 tasks
> total.
>
>
> I've noticed that with "one task manager = one task slot" approach I have
> to
> have 6 task slots/task managers to be able to start this pipeline.
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png>
>
>
> If number of task slots is lower than 6, job is scheduled but not started.
>
> With 6 task slots everything is working fine and I've must say that I'm
> very
> impressed with a way that Flinks balanced data between task slots. Data was
> distributed very evenly between operator instances/tasks.
>
> In this setup (7 operators, 23 tasks and 6 task slots), some task slots
> have
> to be reused by more than one operator. While inspecting UI I've found
> examples such operators. This is what I was expecting though.
>
> However I was surprised a little bit after I added one additional task
> manager (hence one new task slot)
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png>
>
>
> After adding new resources, Flink did not re balanced/redistributed the
> graph. So this host was sitting there and doing nothing. Even after putting
> some load on the cluster, still this node was not used.
>
>
> *After doing this exercise I have few questions:*
>
> 1. It seems that number of task slots must be equal or greater than max
> number of parallelism used in the pipeline. In my case it was 6. When I
> changed parallelism for one of the operator to 7, I had to have 7 task
> slots
> (task managers in my setup) to be able to even start the job.
> Is this the case?
>
> 2. What I can do to use the extra node that was spanned while job was
> running?
> In other words, If I would see that one of my nodes has to much load what I
> can do? Please mind that I'm using keyBy/hashing function in my pipeline
> and
> in my tests I had around 5000 unique keys.
>
> I've try to use REST API to call "rescale" but I got this response:
> /302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/
>
> Thanks.
>
> [1]
>
> https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread Zhu Zhu
Yes. State TTL is by default disabled.

Thanks,
Zhu Zhu

LakeShen  于2020年1月6日周一 上午10:09写道:

> I saw the flink source code, I find the flink state ttl default is
> never expire,is it right?
>
> LakeShen  于2020年1月6日周一 上午9:58写道:
>
>> Hi community,I have a question about flink state ttl.If I don't config
>> the flink state ttl config,
>> How long the flink state retain?Is it forever retain in hdfs?
>> Thanks your replay.
>>
>


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread Zhu Zhu
Yes. State TTL is by default disabled.

Thanks,
Zhu Zhu

LakeShen  于2020年1月6日周一 上午10:09写道:

> I saw the flink source code, I find the flink state ttl default is
> never expire,is it right?
>
> LakeShen  于2020年1月6日周一 上午9:58写道:
>
>> Hi community,I have a question about flink state ttl.If I don't config
>> the flink state ttl config,
>> How long the flink state retain?Is it forever retain in hdfs?
>> Thanks your replay.
>>
>


Re: Rich Function Thread Safety

2019-12-18 Thread Zhu Zhu
Hi Aaron,

It is thread safe since the state snapshot happens in the same thread with
the user function.

Thanks,
Zhu Zhu

Aaron Langford  于2019年12月19日周四 上午11:25写道:

> Hello Flink Community,
>
> I'm hoping to verify some understanding:
>
> If I have a function with managed state, I'm wondering if a
> checkpoint will ever be taken while a function is mutating state. I'll try
> to illustrate the situation I'm hoping to be safe from:
>
> Happy Path:
> t0 -> processFunction invoked with el1
> t1 -> set A to 5
> t2 -> set B to 10
> t3 -> function returns
>
> Unhappy path:
> t0 -> processFunction invoked with el1
> t1 -> set A to 5
> t2 -> function interrupted, checkpoint taken (A = 5, B = 1)
> t3 -> set B to 10
> t4 -> function returns
> ...
> tn -> flink application fails, restart from prev checkpoint (A=5, B=1)
> tn+1 -> recovery begins somewhere, but state is torn anyway, so we're
> going to have a bad time
>
> I don't think this could happen given that checkpoints effectively are
> messages in the pipeline, and the checkpoint is only taken when an operator
> sees the checkpoint barrier.
>
> Hoping to make sure this is correct!
>
> Aaron
>


Re: Deploying Operator on concrete Task Manager

2019-12-18 Thread Zhu Zhu
Hi KristoffSC,

Flink does not support specifying the TM for tasks.
So I think you need to launch a separate job to do the "AsyncCall + map" in
the secured zone.

Thanks,
Zhu Zhu

KristoffSC  于2019年12月18日周三 下午8:04写道:

> Hi,
> I have a question regarding job/operator deployment on Task Managers.
>
> If I understand correctly, my job will be spitted into individual tasks,
> which will be "deployed and executed" on particular task slot/s of Task
> Manager (depending on parallelism level of course).
>
> Lets imagine I have a Job that has:
>
> 1. Kafka Source with map to RawEvent
> 2. Enrichment that has AsyncCall + map to EnrichedEvent
> 3. Key stream with another Map/Process functions maybe with some Windowing.
> 4. Sink
>
> I have a grid that has 1 Job Manager and 3 Task Managers nodes.
> For a security reason one Task manager should be in a special network zone
> (red zone).
>
> The point 2 of my Job (Enrichment that has AsyncCall + map to
> EnrichedEvent)
> should be executed on that particular node that is located in a Secured
> Zone.
>
> All other Operations should not be putted on this node.
> Is there a way to configure this?
>
>
> As an alternative I can see that we could have a separate job that will
> just
> have a source, enrich, sink and this job will be "deployed" on this Secured
> Task Manager. Or maybe we should have separate Flink cluster for this?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: How to reprocess certain events in Flink?

2019-12-18 Thread Zhu Zhu
Hi Pooja,

My main confusion is, if 2 events have the same transaction_id, how can we
tell if it is a wanted one for value updates, or it is an unwanted
duplicate?

MapState with a TTL can be used for deduplicating, if it is supposed that a
duplicated event would not happen too late after the original event was
processed.

Thanks,
Zhu Zhu

Rafi Aroch  于2019年12月18日周三 下午3:50写道:

> Hi Pooja,
>
> Here's an implementation from Jamie Grier for de-duplication using
> in-memory cache with some expiration time:
>
> https://github.com/jgrier/FilteringExample/blob/master/src/main/java/com/dataartisans/DedupeFilteringJob.java
>
> If for your use-case you can limit the time period where duplications may
> happen, you can use this approach.
>
> Thanks,
> Rafi
>
>
> On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal 
> wrote:
>
>> Hey,
>>
>> I am sorry for the confusion. So, the value is not already present in the
>> event. We are reading it from a static table (kind of data enrichment in
>> flink job). Above event is an enriched event.
>> If we say that this is some transaction event, the user would have done
>> the transaction once and hence the transaction_id is unique. But, the table
>> from where we are reading the value may contain the wrong value (not
>> always, sometimes because of bug). In this case, we may want to reprocess
>> that transaction event with new value (here, the transaction_id will be
>> same as previous, but the value will change). I hope this clears the
>> scenario. Let me know if you have any other questions.
>>
>> To solve the idempotency problem, you suggested to maintain a set
>> recording transaction_id(s). Since, we are aggregating over all events seen
>> till now, the number of events and hence ids will be too large. I am
>> assuming we will need to have some external store here and do a lookup
>> every time we process an event. This may increase the latency. Can you
>> suggest the efficient way to solve this? and if we need to have an external
>> store, what will be the best candidate?
>>
>> Thanks
>> Pooja
>>
>>
>>
>> On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu  wrote:
>>
>>> Hi Pooja,
>>>
>>> I'm a bit confused since in 1) it says that "If two events have same
>>> transaction_id, we can say that they are duplicates", and in 2) it says
>>> that "Since this is just a value change, the transaction_id will be same".
>>> Looks to me they are conflicting. Usually in case 2) scenarios, the value
>>> updates event is considered as new event which does not share the unique id
>>> with prior events.
>>>
>>> If each event has a unique transaction_id, you can use it to
>>> de-duplicate the events via a set recording the transaction_id(s) which are
>>> already processed. And then 2) would not be a problem with the unique
>>> transaction_id assumption.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Pooja Agrawal  于2019年12月17日周二 下午9:17写道:
>>>
>>>>
>>>> Hi,
>>>>
>>>> I have a use case where we are reading events from kinesis stream.The
>>>> event can look like this
>>>> Event {
>>>>   event_id,
>>>>   transaction_id
>>>>   key1,
>>>>   key2,
>>>>   value,
>>>>   timestamp,
>>>>   some other fields...
>>>> }
>>>>
>>>> We want to aggregate the values per key for all events we have seen
>>>> till now (as simple as "select key1, key2, sum(value) from events group by
>>>> key1, key2key."). For this I have created a simple flink job which uses
>>>> flink-kinesis connector and applies keyby() and sum() on the incoming
>>>> events. I am facing two challenges here:
>>>>
>>>> 1) The incoming events can have duplicates. How to maintain exactly
>>>> once processing here, as processing duplicate events can give me erroneous
>>>> result? The field transaction_id will be unique for each events. If two
>>>> events have same transaction_id, we can say that they are duplicates (By
>>>> duplicates here, I don't just mean the retried ones. The same message can
>>>> be present in kinesis with different sequence number. I am not sure if
>>>> flink-kinesis connector can handle that, as it is using KCL underlying
>>>> which I assume doesn't take care of it)
>>>>
>>>> 2) There can be the the cases where the value has been updated for a
>>>> key after processing the event and we may want to reprocess those events
>>>> with new value. Since this is just a value change, the transaction_id will
>>>> be same. The idempotency logic will not allow to reprocess the events. What
>>>> are the ways to handle such scenarios in flink?
>>>>
>>>> Thanks
>>>> Pooja
>>>>
>>>>
>>>> --
>>>> Warm Regards,
>>>> Pooja Agrawal
>>>>
>>>
>>
>> --
>> Warm Regards,
>> Pooja Agrawal
>>
>


Re: Different jobName per Job when reporting Flink metrics to PushGateway

2019-12-17 Thread Zhu Zhu
Hi Sidney,

"metrics.reporter.promgateway.jobName" is a Flink cluster wide config, so
you will need to set it in flink-conf.yaml before launching the Flink
cluster.
An alternative is to use -D(or -yD for yarn) params to override the config
when running a command to launch the Flink session cluster or submit a job
in job cluster mode.

Thanks,
Zhu Zhu

Sidney Feiner  于2019年12月17日周二 下午11:08写道:

> I'm using Flink 1.9.1 with PrometheusPushGateway to report my metrics. The
> jobName the metrics are reported with is defined in the flink-conf.yaml
> file which makes the jobName identical for all jobs who run on the cluster,
> but I want a different jobName to be reported for every running job. To do
> so, I tried doing the following in my code before executing the Stream:
>
> Configuration conf = GlobalConfiguration.loadConfiguration();
> conf.setString(
> "metrics.reporter.promgateway.jobName",
> conf.getString("metrics.reporter.promgateway.jobName", "") + "-" 
> + pipeline
> );
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setGlobalJobParameters(conf);
>
> When pipeline is a String variable.
>
> When running the job locally, it worked. But now I'm running flink in High
> Availability mode and it doesn't work anymore :( The config I override in
> the code is ignored.
>
> So how can I change the jobName per job? And if I can't, is there a way to
> set additional Labels when reporting the metrics? Because I haven't seen an
> option for that as well.
>
> Thanks :)
>
>
> I've posted this on StackOverflow as well - here
> <https://stackoverflow.com/questions/59376693/different-jobname-per-job-when-reporting-flink-metrics-to-pushgateway>
> :)
>
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
>


Re: How to reprocess certain events in Flink?

2019-12-17 Thread Zhu Zhu
Hi Pooja,

I'm a bit confused since in 1) it says that "If two events have same
transaction_id, we can say that they are duplicates", and in 2) it says
that "Since this is just a value change, the transaction_id will be same".
Looks to me they are conflicting. Usually in case 2) scenarios, the value
updates event is considered as new event which does not share the unique id
with prior events.

If each event has a unique transaction_id, you can use it to de-duplicate
the events via a set recording the transaction_id(s) which are
already processed. And then 2) would not be a problem with the unique
transaction_id assumption.

Thanks,
Zhu Zhu

Pooja Agrawal  于2019年12月17日周二 下午9:17写道:

>
> Hi,
>
> I have a use case where we are reading events from kinesis stream.The
> event can look like this
> Event {
>   event_id,
>   transaction_id
>   key1,
>   key2,
>   value,
>   timestamp,
>   some other fields...
> }
>
> We want to aggregate the values per key for all events we have seen till
> now (as simple as "select key1, key2, sum(value) from events group by key1,
> key2key."). For this I have created a simple flink job which uses
> flink-kinesis connector and applies keyby() and sum() on the incoming
> events. I am facing two challenges here:
>
> 1) The incoming events can have duplicates. How to maintain exactly once
> processing here, as processing duplicate events can give me erroneous
> result? The field transaction_id will be unique for each events. If two
> events have same transaction_id, we can say that they are duplicates (By
> duplicates here, I don't just mean the retried ones. The same message can
> be present in kinesis with different sequence number. I am not sure if
> flink-kinesis connector can handle that, as it is using KCL underlying
> which I assume doesn't take care of it)
>
> 2) There can be the the cases where the value has been updated for a key
> after processing the event and we may want to reprocess those events with
> new value. Since this is just a value change, the transaction_id will be
> same. The idempotency logic will not allow to reprocess the events. What
> are the ways to handle such scenarios in flink?
>
> Thanks
> Pooja
>
>
> --
> Warm Regards,
> Pooja Agrawal
>


Re: [EXTERNAL] Flink and Prometheus monitoring question

2019-12-16 Thread Zhu Zhu
Hi Jesús,
If your job has checkpointing enabled, you can monitor
'numberOfCompletedCheckpoints' to see wether the job is still alive and
healthy.

Thanks,
Zhu Zhu

Jesús Vásquez  于2019年12月17日周二 上午2:43写道:

> The thing about numRunningJobs metric is that i have to configure in
> advance the Prometheus rules with the number of jobs i expect to be running
> in order to alert, i kind of need this rule to alert on individual jobs. I
> initially thought of flink_jobmanager_downtime{job_id=~".*"} == -1 , bit it
> resulted that the metric just emits 0 on running jobs, and doesn't emit -1
> for failed jobs.
>
> El lun., 16 dic. 2019 7:01 p. m., PoolakkalMukkath, Shakir <
> shakir_poolakkalmukk...@comcast.com> escribió:
>
>> You could use “flink_jobmanager_numRunningJobs” to check the number of
>> running jobs.
>>
>>
>>
>> Thanks
>>
>>
>>
>> *From: *Jesús Vásquez 
>> *Date: *Monday, December 16, 2019 at 12:47 PM
>> *To: *"user@flink.apache.org" 
>> *Subject: *[EXTERNAL] Flink and Prometheus monitoring question
>>
>>
>>
>> Hi,
>>
>> I want to monitor Flink Streaming jobs using Prometheus
>>
>> My first goal is to send alerts when a Flink job has failed.
>>
>> The thing is that looking at the documentation I haven't found a metric
>> that helps me defining an alerting rule.
>>
>> As a starting point i thought that the metric
>> flink_jobmanager_job_downtime could help since the doc says this metric
>> emits -1 for a completed job.
>>
>> But when i tested this i found out this doesn't work since the metric
>> always emits 0 and after the job is completed there is no metric.
>>
>> Has anyone managed to alert when flink job has failed with Prometheus?
>>
>> Thanks for your help.
>>
>


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-12 Thread Zhu Zhu
Thanks Hequn for driving the release and everyone who makes this release
possible!

Thanks,
Zhu Zhu

Wei Zhong  于2019年12月12日周四 下午3:45写道:

> Thanks Hequn for being the release manager. Great work!
>
> Best,
> Wei
>
> 在 2019年12月12日,15:27,Jingsong Li  写道:
>
> Thanks Hequn for your driving, 1.8.3 fixed a lot of issues and it is very
> useful to users.
> Great work!
>
> Best,
> Jingsong Lee
>
> On Thu, Dec 12, 2019 at 3:25 PM jincheng sun 
> wrote:
>
>> Thanks for being the release manager and the great work Hequn :)
>> Also thanks to the community making this release possible!
>>
>> Best,
>> Jincheng
>>
>> Jark Wu  于2019年12月12日周四 下午3:23写道:
>>
>>> Thanks Hequn for helping out this release and being the release manager.
>>> Great work!
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:
>>>
>>> > Great work, Hequn
>>> >
>>> > Dian Fu  于2019年12月12日周四 下午2:32写道:
>>> >
>>> >> Thanks Hequn for being the release manager and everyone who
>>> contributed
>>> >> to this release.
>>> >>
>>> >> Regards,
>>> >> Dian
>>> >>
>>> >> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>>> >>
>>> >> Hi,
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache
>>> Flink
>>> >> 1.8 series.
>>> >>
>>> >> Apache Flink® is an open-source stream processing framework for
>>> >> distributed, high-performing, always-available, and accurate data
>>> streaming
>>> >> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> >> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community
>>> who
>>> >> made this release possible!
>>> >> Great thanks to @Jincheng as a mentor during this release.
>>> >>
>>> >> Regards,
>>> >> Hequn
>>> >>
>>> >>
>>> >>
>>> >
>>> > --
>>> > Best Regards
>>> >
>>> > Jeff Zhang
>>> >
>>>
>>
>
> --
> Best, Jingsong Lee
>
>
>


Re: Apache Flink - Retries for async processing

2019-12-10 Thread Zhu Zhu
Hi M Singh,

I think you would be able to know the request failure cause and whether it
is recoverable or not.
You can handle the error as you like.
For example, if you think the error is unrecoverable, you can complete the
ResultFuture exceptionally to expose this failure to Flink framework. If
the error is recoverable, you can just retry (or refresh the token), and
only complete the ResultFuture until it succeeds (until timeout).

Thanks,
Zhu Zhu

M Singh  于2019年12月10日周二 下午8:51写道:

> Thanks Jingsong for sharing your solution.
>
> Since both refreshing the token and the actual API request can fail with
> either recoverable and unrecoverable exceptions, are there any patterns for
> retrying both and making the code robust to failures.
>
> Thanks again.
>
> On Monday, December 9, 2019, 10:08:39 PM EST, Jingsong Li <
> jingsongl...@gmail.com> wrote:
>
>
> Hi M Singh,
>
> Our internal has this scenario too, as far as I know, Flink does not have
> this internal mechanism in 1.9 too.
> I can share my solution:
> - In async function, start a thread factory.
> - Send the call to thread factory when this call has failed. Do refresh
> security token too.
> Actually, deal with anything in function. As long as we finally call the
> relevant methods of ResultFuture.
>
> Best,
> Jingsong Lee
>
> On Tue, Dec 10, 2019 at 3:25 AM M Singh  wrote:
>
> Hi Folks:
>
> I am working on a project where I will be using Flink's async processing
> capabilities.  The job has to make http request using a token.  The token
> expires periodically and needs to be refreshed.
>
> So, I was looking for patterns for handling async call failures and
> retries when the token expires.  I found this link Re: Backoff strategies
> for async IO functions?
> <http://mail-archives.apache.org/mod_mbox/flink-user/201903.mbox/%3CCAC27z=pou2chkxxcomu5ty60n6fhlhjxbwcyb2tqkkz3yrb...@mail.gmail.com%3E>
>  and
> it appears that Flink does not support retries and periodically refresh a
> security token.  I am using 1.6 at the moment but am planning to migrate to
> 1.9 soon.
>
> Re: Backoff strategies for async IO functions?
>
>
> <http://mail-archives.apache.org/mod_mbox/flink-user/201903.mbox/%3CCAC27z=pou2chkxxcomu5ty60n6fhlhjxbwcyb2tqkkz3yrb...@mail.gmail.com%3E>
>
>
> If there are any patterns on how to deal with this scenario, please let me
> know.
>
> Thanks
>
> Mans
>
>
>
> --
> Best, Jingsong Lee
>


Re: Collections as Flink job parameters

2019-11-18 Thread Zhu Zhu
Hi Протченко,

Yes you cannot get a Map argument from ParameterTool directly.
ParameterTool fetches and stores data in the form of string so it's not
feasible to support any types of configuration values which may be set by
users.

A workaround is to convert the map to a string in head and parse it later
in the main method.

Thanks,
Zhu Zhu

Протченко Алексей  于2019年11月19日周二 上午12:29写道:

>
> Hello all.
>
> I have a question about providing complex configuration to Flink job. We
> are working on some kind of platform for running used-defined packages
> which actually cantain the main business logic. All the parameters we are
> providing via command line and parse with ParameterTool. That’s ok until we
> have parameters of simple types like String, int etc. But the problem is
> that we need to add a Map of custom parameters for users to provide
> configuration variables specific for their code.
>
> Reading documentation and code of ParameterTool I do not see clear
> possibility to do it. Is using third-party arguments parser is the only
> option?
>
> Best regards,
> Alex
>
>
> --
> Алексей Протченко
>


Re: Flink configuration at runtime

2019-11-18 Thread Zhu Zhu
Hi Amran,

Some configs, including "state.checkpoints.num-retained", are cluster
configs that always apply to the entire Flink cluster.
An alternative is to use per-job mode if you are running Flink jobs on
k8s/docker or yarn. Thus to create a Flink cluster for a single job.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/docker.html#flink-job-cluster
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn

Thanks,
Zhu Zhu

amran dean  于2019年11月19日周二 上午5:53写道:

> Is it possible to configure certain settings at runtime, on a per-job
> basis rather than globally within flink-conf.yaml?
>
> For example, I have a job where it's desirable to retain a large number of
> checkpoints via
> state.checkpoints.num-retained.
>
> The checkpoints are cheap, and it's low cost. For other jobs, I don't want
> such a large number.
>
>
>


Re: Job Distribution Strategy On Cluster.

2019-11-11 Thread Zhu Zhu
There is no plan for release 1.9.2 yet.
Flink 1.10.0 is planned to be released in early January.

Thanks,
Zhu Zhu

srikanth flink  于2019年11月11日周一 下午9:53写道:

> Zhu Zhu,
>
> That's awesome and is what I'm looking for.
> Any update on when would be the next release date?
>
> Thanks
> Srikanth
>
> On Mon, Nov 11, 2019 at 3:40 PM Zhu Zhu  wrote:
>
>> Hi Srikanth,
>>
>> Is this issue what you encounter? FLINK-12122: a job would tend to fill
>> one TM before using another.
>> If it is, you may need to wait for the release 1.9.2 or 1.10, since it is
>> just fixed.
>>
>> Thanks,
>> Zhu Zhu
>>
>> vino yang  于2019年11月11日周一 下午5:48写道:
>>
>>> Hi srikanth,
>>>
>>> What's your job's parallelism?
>>>
>>> In some scenes, many operators are chained with each other. if it's
>>> parallelism is 1, it would just use a single slot.
>>>
>>> Best,
>>> Vino
>>>
>>> srikanth flink  于2019年11月6日周三 下午10:03写道:
>>>
>>>> Hi there,
>>>>
>>>> I'm running Flink with 3 node cluster.
>>>> While running my jobs(both SQL client and jar submission), the jobs are
>>>> being assigned to single machine instead of distribution among the cluster.
>>>> How could I achieve the job distribution to make use of the computation
>>>> power?
>>>>
>>>> Thanks
>>>> Srikanth
>>>>
>>>


Re: Job Distribution Strategy On Cluster.

2019-11-11 Thread Zhu Zhu
Hi Srikanth,

Is this issue what you encounter? FLINK-12122: a job would tend to fill one
TM before using another.
If it is, you may need to wait for the release 1.9.2 or 1.10, since it is
just fixed.

Thanks,
Zhu Zhu

vino yang  于2019年11月11日周一 下午5:48写道:

> Hi srikanth,
>
> What's your job's parallelism?
>
> In some scenes, many operators are chained with each other. if it's
> parallelism is 1, it would just use a single slot.
>
> Best,
> Vino
>
> srikanth flink  于2019年11月6日周三 下午10:03写道:
>
>> Hi there,
>>
>> I'm running Flink with 3 node cluster.
>> While running my jobs(both SQL client and jar submission), the jobs are
>> being assigned to single machine instead of distribution among the cluster.
>> How could I achieve the job distribution to make use of the computation
>> power?
>>
>> Thanks
>> Srikanth
>>
>


Re: Flink disaster recovery test problems

2019-11-11 Thread Zhu Zhu
Hi Zhong,

Looks you are assigning tasks to different slot sharing groups to force
them to not share the same slot.
So you will need at least 2 slots for the streaming job to start running
successfully.
Killing one of the 2 TM, one slot in each, will lead to insufficient slots
and your job will hang at slot allocation.

Task states are needed to not skip unprocessed source data, thus to avoid
data loss. It's also needed if you want the failed task to recovery to the
state right before failure.
Checkpointing is needed to persist the task states. If it is not enabled,
the job will restart with the initial state, i.e. the job will consume data
from the very beginning and there can be a big data regression.

Thanks,
Zhu Zhu

钟旭阳  于2019年11月5日周二 下午3:01写道:

> hello:
>
>
> I am currently learning flink.I recently had a problem with Flink for
> disaster recovery testing.I tried to find an answer on the official website
> and blog but failed.I am trying to find community help.
>
>
> The current situation is:I have two servers, each with one slot.My
> application has two parallel operators with a degree of parallelism of 1,
> using the slotSharingGroup function to make them run in these two slots
> respectively.
>
>
> My disaster recovery test is to shut down one of the servers. But is it
> possible that two parallel operators compete for the same server slot? In
> addition to this,I want to dynamically add or remove servers (simulated
> power failures,etc) while Flink is running, but I think this must cause
> stream data loss. Is it only one way to restart Flink through the
> checkpoint mechanism to ensure that data is not lost and the number of
> servers is dynamically configured?
>
>
> Best
> Zhong


Re: Flink batch app occasionally hang

2019-10-29 Thread Zhu Zhu
Hi Caio,

Did you check whether there are enough resources to launch the other nodes?

Could you attach the logs you mentioned? And elaborate how the tasks are
connected in the topology?


Thanks,
Zhu Zhu

Caio Aoque  于2019年10月30日周三 上午8:31写道:

> Hi, I've been running some flink scala applications on an AWS EMR cluster
> (version 5.26.0 with flink 1.8.0 for scala 2.11) for a while and I
> started to have some issues now.
>
> I have a flink app that reads some files from S3, process them and save
> some files to s3 and also some records to a database.
>
> The application is not so complex it has a source that reads a directory
> (multiple files) and other one that reads a single one and then it has some
> grouping and mapping and a left outer join between these 2 sources.
>
> The issue is that occasionally the application got stuck with only two
> tasks running, one finished and the other ones not even run. The 2 tasks
> that keep running forever are the source1 from directory (multiple files)
> and the leftouterjoin, the source2 (input from a single file) is the one
> that finishes. One interest thing is that there should be several tasks
> between source 1 and this leftouterjoin but they remain in CREATED state.
> If the app stuck usually I simply kill that and run that again, which
> works. The issue is not that frequent but is getting more and more
> frequent. It's happening almost everyday now.
>
> I also have a DEBUG log from a job that didn't work and another one from a
> job that worked.
>
> Thanks.
>


Re: Data processing with HDFS local or remote

2019-10-20 Thread Zhu Zhu
Sources of batch jobs process InputSplit. Each InputSplit can be a file or
a file block according to the FileSystem(for HDFS it is file block).
Sources need to retrieve InputSplits to process from InputSplitAssigner at
JM.
In this way, the assigning of InputSplit to source tasks are possible to
take the InputSplit location and task location into consideration to
support input locality.

To enable this input locality support, it is required to use a InputFormat
which leverages LocatableInputSplitAssigner and LocatableInputSplit, e.g.
FileInputFormat, HadoopInputFormat, etc.
The file reading source interfaces provided in ExecutionEnvironment, like
#readTextFile and #readFile,  use FileInputFormat, so the input locality is
supported by default.

Thanks,
Zhu Zhu

Pritam Sadhukhan  于2019年10月21日周一 上午10:17写道:

> Hi Zhu Zhu,
>
> Thanks for your detailed answer.
> Can you please help me to understand how flink task process the data
> locally on data nodes first?
> I want to understand how flink determines the processing to be done at the
> data nodes?
>
> Regards,
> Pritam.
>
> On Sat, 19 Oct 2019 at 08:16, Zhu Zhu  wrote:
>
>> Hi Pratam,
>>
>> Flink does not deploy tasks to certain nodes according to source data
>> locations.
>> Instead, it will let a task process local input splits (data on the same
>> node) first.
>> So if your parallelism is large enough to distribute on all the data
>> nodes, most data can be processed locally.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Pritam Sadhukhan  于2019年10月18日周五 上午10:59写道:
>>
>>> Hi,
>>>
>>> I am trying to process data stored on HDFS using flink batch jobs.
>>> Our data is splitted into 16 data nodes.
>>>
>>> I am curious to know how data will be pulled from the data nodes with
>>> the same number of parallelism set as the data split on HDFS i.e. 16.
>>>
>>> Is the flink task being executed locally on the data node server or it
>>> will happen in the flink nodes where data will be pulled remotely?
>>>
>>> Any help will be appreciated.
>>>
>>> Regards,
>>> Pritam.
>>>
>>


Re: Data processing with HDFS local or remote

2019-10-18 Thread Zhu Zhu
Hi Pratam,

Flink does not deploy tasks to certain nodes according to source data
locations.
Instead, it will let a task process local input splits (data on the same
node) first.
So if your parallelism is large enough to distribute on all the data nodes,
most data can be processed locally.

Thanks,
Zhu Zhu

Pritam Sadhukhan  于2019年10月18日周五 上午10:59写道:

> Hi,
>
> I am trying to process data stored on HDFS using flink batch jobs.
> Our data is splitted into 16 data nodes.
>
> I am curious to know how data will be pulled from the data nodes with the
> same number of parallelism set as the data split on HDFS i.e. 16.
>
> Is the flink task being executed locally on the data node server or it
> will happen in the flink nodes where data will be pulled remotely?
>
> Any help will be appreciated.
>
> Regards,
> Pritam.
>


Re: Is it possible to get Flink job name in an operator?

2019-10-15 Thread Zhu Zhu
I think ExecutionConfig.GlobalJobParameters is the way to do this if you
want to retrieve it in runtime.
Or you can just pass the name to each operator you implement to have it
serialized together with the udf.

Thanks,
Zhu Zhu

马阳阳  于2019年10月15日周二 下午3:11写道:

> As the title. Is it possible now? Or if we can do something to achieve
> this. I tried to put the job name into the 
> ExecutionConfig.GlobalJobParameters.
> But it is not possible to get the job name before Environment.execute() is
> called.
>
> Best regards,
> mayangyang
>


Re: Discard message on deserialization errors.

2019-10-12 Thread Zhu Zhu
I mean the Kafka source provided in Flink can correctly ignores null
deserialized values.

isEndOfStream allows you to control when to end the input stream.
If it is used for running infinite stream jobs, you can simply return false.

Thanks,
Zhu Zhu

John Smith  于2019年10月12日周六 下午8:40写道:

> The Kafka Fetcher you mean the flink JSON schemas? They throw IOExceptions?
>
> Also what's the purpose of isEndOfStream most schemas I looked at don't do
> anything but just return false?
>
> On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu,  wrote:
>
>> Hi John,
>>
>> It should work with a *null* return value.
>> In the java doc of DeserializationSchema#deserialize it says that
>>
>>> *@return The deserialized message as an object (null if the message
>>> cannot be deserialized).*
>>
>>
>> I also checked the Kafka fetcher in Flink and it can correctly handle a
>> null deserialized record.
>>
>> Just pay attention to also not make *DeserializationSchema#isEndOfStream* 
>> throw
>> errors on a null record provided.
>>
>> Thanks,
>> Zhu Zhu
>>
>> John Smith  于2019年10月12日周六 上午5:36写道:
>>
>>> Hi using Flink 1.8.0.
>>>
>>> I am ingesting data from Kafka, unfortunately for the time being I have
>>> not looked into using the schema registry.
>>>
>>> So for now I would like to write a simple deserialization schema that
>>> discards the data if deserialization fails.
>>>
>>> The other option is to do in flat map with markers and split to dead
>>> letter queue, but I'm not too concerned about that for now.
>>>
>>> Is it ok to just return null if deserialization fails?
>>>
>>> @Override
>>> public MyObject deserialize(byte[] message) {
>>>try {
>>>   return MyDecoder.decode(message));
>>>} catch(IOException ex) {
>>>   logger.warn("Failed to decode message.", ex);
>>>   return null;
>>>}
>>> }
>>>
>>>


Re: Discard message on deserialization errors.

2019-10-11 Thread Zhu Zhu
Hi John,

It should work with a *null* return value.
In the java doc of DeserializationSchema#deserialize it says that

> *@return The deserialized message as an object (null if the message cannot
> be deserialized).*


I also checked the Kafka fetcher in Flink and it can correctly handle a
null deserialized record.

Just pay attention to also not make *DeserializationSchema#isEndOfStream* throw
errors on a null record provided.

Thanks,
Zhu Zhu

John Smith  于2019年10月12日周六 上午5:36写道:

> Hi using Flink 1.8.0.
>
> I am ingesting data from Kafka, unfortunately for the time being I have
> not looked into using the schema registry.
>
> So for now I would like to write a simple deserialization schema that
> discards the data if deserialization fails.
>
> The other option is to do in flat map with markers and split to dead
> letter queue, but I'm not too concerned about that for now.
>
> Is it ok to just return null if deserialization fails?
>
> @Override
> public MyObject deserialize(byte[] message) {
>try {
>   return MyDecoder.decode(message));
>} catch(IOException ex) {
>   logger.warn("Failed to decode message.", ex);
>   return null;
>}
> }
>
>


Re: Where are uploaded Job jars stored?

2019-10-10 Thread Zhu Zhu
Hi John,

Not sure why you need to know the location of uploaded job jars?

The job jar will be automatically localized to a taskmanager via
BlobService when a task belonging to the job is running on that taskmanager.
The localization dir is blob.storage.directory. If it is not specified, it
will be java.io.tmpdir in standalone mode.

Thanks,
Zhu Zhu

John Smith  于2019年10月11日周五 上午2:41写道:

> And can that folder be shared so that all nodes see it?
>
> On Thu, 10 Oct 2019 at 14:36, Yun Tang  wrote:
>
>> Hi John
>>
>> The jar is not stored in HA path, I think the answer [1] could help you.
>>
>> [1]
>> https://stackoverflow.com/questions/51936608/where-can-i-find-my-jar-on-apache-flink-server-which-i-submitted-using-apache-fl
>>
>> Best
>> Yun Tang
>> --
>> *From:* John Smith 
>> *Sent:* Friday, October 11, 2019 2:06
>> *To:* user 
>> *Subject:* Where are uploaded Job jars stored?
>>
>> Hi using 1.8.0 running on standalone cluster with Zookeeper HA.
>>
>> Are job JARs stored at: high-availability.storageDir ???
>>
>> The thing is when you browse the individual nodes at port 8080 to go
>> submit the job only the node where you uploaded the JAR has it.
>>
>> - Go to any given node
>> - Upload a jar
>> - Browse another node
>> - Jar is not there.
>>
>>
>>


Re: ** Help need w.r.t parallelism settings in flink **

2019-09-26 Thread Zhu Zhu
Hi Akshay,

For your questions,
1. One main purpose of maxParallelism is to decide the count of
keyGroup. keyGroup is the bucket for keys when doing keyBy partitioning.
So a larger maxParallelism indicates a finer granularity for key
distribution. No matter it's a stateful operator or not.

2. You can only set the parallelism and Flink can automatically decide the
maxParallelism for it.
But it is recommended to set the maxParallelism to a fixed proper value.

3. The parallelism is the actually parallelism used.
maxParallelism is the upper bound limit of parallelism when you tries to
change the parallelism via manually rescaling.

Thanks,
Zhu Zhu

Akshay Iyangar  于2019年9月27日周五 上午4:20写道:

> Hi
>
> So we are running a beam pipeline that uses flink as its execution engine.
> We are currently on flink1.8
>
> So per the flink documentation I see that there is an option that allows u
> to set
>
>
>
> *Parallelism *
>
> and
>
> *maxParallelism*.
>
>
>
> We actually want to set both so that we can dynamically scale the pipeline
> if there is back pressure on it.
>
>
>
> I want to clarify a few doubts I had –
>
>
>
>1. Does the maxParallelism only work for stateful operators or does it
>work for all the operators?
>
>
>
>1. Also is the setting either or ? Like we can only set parallelism or
>maxParallelism? Or is it that we can set both?
>
>
>
>1. Because, Currently I have the parallelism at 8 and maxparallelism
>at 32 but the UI only shows me that it is using parallelism of 8. It would
>be great if someone helps me understand the exact behavior of these
>parameters.
>
>
>
> Thanks
>
> Akshay Iyangar
>
>
>


Re: Anomaly in handling late arriving data

2019-09-25 Thread Zhu Zhu
Hi Indraneel,

In your case, ("u1", "e12", 8L) is not considered late and will go into the
session window {e7,e8,e9,e11} (range=11~19).
This is because 8+3(session gap) >= 11, the lower bound of the existing
session window

Regarding your 3 questions:
*>> 1) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L)*
7+3 < 11, so e12 will not go into the session window {e7,e8,e9,e11}.
And it will be fired for the lateness.

*>> 2) allowedLateness(Time.milliseconds(2L)) change
to allowedLateness(Time.milliseconds(1L)) *
Reduce the allowedLateness will cause window {e7,e8} to be fired when e9
arrives.
So when e12 arrives, the existing session window is (e9,e11} (range=14~17).
e12 will be considered to be late in this case.

*>> 3)   Event("u1", "e12", 8L) change to Event("u1", "e12", 7L) AND
allowedLateness(Time.milliseconds(2L)) change
to allowedLateness(Time.milliseconds(4L)) *
The same as case 1).

Thanks,
Zhu Zhu

Indraneel R  于2019年9月26日周四 上午2:24写道:

> Hi Everyone,
>
> I am trying to execute this simple sessionization pipeline, with the
> allowed lateness shown below:
>
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(2)
>
>
> val source: DataStream[Event] = env.addSource(new
> SourceFunction[Event] {
>   lazy val input: Seq[Event] = Seq(
> Event("u1", "e1", 1L),
> Event("u1", "e5", 6L),
> Event("u1", "e7", 11L),
> Event("u1", "e8", 12L),
> Event("u1", "e9", 16L),
> Event("u1", "e11", 14L),
> *Event("u1", "e12", 8L),*
> Event("u1", "e13", 20L),
>   )
>
>   override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
> {
>   input.foreach(event => {
> ctx.collectWithTimestamp(event, event.timestamp)
> *ctx.emitWatermark(new Watermark(event.timestamp - 1))*
>   })
>   ctx.emitWatermark(new Watermark(Long.MaxValue))
> }
>   }
>
>   override def cancel(): Unit = {}
> })
>
> val tag: OutputTag[Event] = OutputTag("late-data")
>
> val sessionizedStream: DataStream[Event] = source
>   .keyBy(item => item.userId)
> *  .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))*
>   .sideOutputLateData(tag)
> *  .allowedLateness(Time.milliseconds(2L))*
>   .process(new ProcessWindowFunction[Event, Event, String, TimeWindow]
> {
>
> override def process(key: String, context: Context, elements:
> Iterable[Event], out: Collector[Event]): Unit = {
>   val sessionIdForWindow = key + "-" + context.currentWatermark +
> "-" + context.window.getStart
>
>   elements.toSeq
> .sortBy(event => event.timestamp)
> .foreach(event => {
>   out.collect(event.copy(sessionId = sessionIdForWindow, count
> = elements.size))
> })
> }
>   })
>
> sessionizedStream.getSideOutput(tag).print()
> env.execute()
>   }
>
> But heres the problem. I am expecting the event highlighted in red
> above(e12) , to be collected in the side output as a late event.
>
> But it isn't. The event is not printed.
>
> Whats interesting is, if I make *any one* of the following changes, the
> event e12 is considered late and is printed.
>1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*)
>2) allowedLateness(Time.milliseconds(2L))   change
> to allowedLateness(Time.milliseconds(*1L*))
>   3)   Event("u1", "e12", 8L) *change to *Event("u1", "e12",
> *7L*) *AND*
> allowedLateness(Time.milliseconds(2L))   *change to *
> allowedLateness(Time.milliseconds(4*L*))   // or anything less than 7L
>
> Can someone explain whats going on? What am I missing here?
>
>
> regards
> -Indraneel
>
>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-25 Thread Zhu Zhu
We will then keep the decision that we do not support customized restart
strategy in Flink 1.10.

Thanks Steven for the inputs!

Thanks,
Zhu Zhu

Steven Wu  于2019年9月26日周四 上午12:13写道:

> Zhu Zhu, that is correct.
>
> On Tue, Sep 24, 2019 at 8:04 PM Zhu Zhu  wrote:
>
>> Hi Steven,
>>
>> As a conclusion, since we will have a meter metric[1] for restarts,
>> customized restart strategy is not needed in your case.
>> Is that right?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14164
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月25日周三 上午2:30写道:
>>
>>> Zhu Zhu,
>>>
>>> Sorry, I was using different terminology. yes, Flink meter is what I was
>>> talking about regarding "fullRestarts" for threshold based alerting.
>>>
>>> On Mon, Sep 23, 2019 at 7:46 PM Zhu Zhu  wrote:
>>>
>>>> Steven,
>>>>
>>>> In my mind, Flink counter only stores its accumulated count and reports
>>>> that value. Are you using an external counter directly?
>>>> Maybe Flink Meter/MeterView is what you need? It stores the count and
>>>> calculates the rate. And it will report its "count" as well as "rate" to
>>>> external metric services.
>>>>
>>>> The counter "task_failures" only works if the individual failover
>>>> strategy is enabled. However, it is not a public interface and is not
>>>> suggested to use, as the fine grained recovery (region failover) now
>>>> supersedes it.
>>>> I've opened a ticket[1] to add a metric to show failovers that respects
>>>> fine grained recovery.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-14164
>>>>
>>>> Thanks,
>>>> Zhu Zhu
>>>>
>>>> Steven Wu  于2019年9月24日周二 上午6:41写道:
>>>>
>>>>>
>>>>> When we setup alert like "fullRestarts > 1" for some rolling window,
>>>>> we want to use counter. if it is a Gauge, "fullRestarts" will never go
>>>>> below 1 after a first full restart. So alert condition will always be true
>>>>> after first job restart. If we can apply a derivative to the Gauge value, 
>>>>> I
>>>>> guess alert can probably work. I can explore if that is an option or not.
>>>>>
>>>>> Yeah. Understood that "fullRestart" won't increment when fine grained
>>>>> recovery happened. I think "task_failures" counter already exists in 
>>>>> Flink.
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:
>>>>>
>>>>>> Steven,
>>>>>>
>>>>>> Thanks for the information. If we can determine this a common issue,
>>>>>> we can solve it in Flink core.
>>>>>> To get to that state, I have two questions which need your help:
>>>>>> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
>>>>>> Gauge. Does the metric reporter you use report Counter and
>>>>>> Gauge to external services in different ways? Or anything else can 
>>>>>> be
>>>>>> different due to the metric type?
>>>>>> 2. Is the "number of restarts" what you actually need, rather than
>>>>>> the "fullRestart" count? If so, I believe we will have such a counter
>>>>>> metric in 1.10, since the previous "fullRestart" metric value is not the
>>>>>> number of restarts when grained recovery (feature added 1.9.0) is 
>>>>>> enabled.
>>>>>> "fullRestart" reveals how many times entire job graph has been
>>>>>> restarted. If grained recovery (feature added 1.9.0) is enabled, the 
>>>>>> graph
>>>>>> would not be restarted when task failures happen and the "fullRestart"
>>>>>> value will not increment in such cases.
>>>>>>
>>>>>> I'd appreciate if you can help with these questions and we can make
>>>>>> better decisions for Flink.
>>>>>>
>>>>>> Thanks,
>>>>>> Zhu Zhu
>>>>>>
>>>>>> Steven Wu  于2019年9月22日周日 上午3:31写道:
>>>>>>
>>>>>>> Zhu Zhu,
>>>>>>>
>>>>>>> Flink 

Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-25 Thread Zhu Zhu
Yes. 1.8.2 contains all commits in 1.8.1.

Subramanyam Ramanathan 
于2019年9月25日周三 下午5:03写道:

> Hi Zhu,
>
>
>
> Thanks a lot !
>
> Since 1.8.2 is also available, would it be right to assume 1.8.2 would
> also contain the fix ?
>
>
>
> Thanks,
>
> Subbu
>
>
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com]
> *Sent:* Tuesday, September 24, 2019 9:39 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Dian Fu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> I checked the commits.
>
> There are 2 fixes in FLINK-10455, only release 1.8.1 and release 1.9.0
> contain both of them.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
> Subramanyam Ramanathan  于2019年9月24
> 日周二 下午11:02写道:
>
> Hi Zhu,
>
>
>
> We also use FlinkKafkaProducer(011), hence I felt this fix would also be
> needed for us.
>
>
>
> I agree that the fix for the issue I had originally mentioned would not be
> fixed by this, but I felt that I should be consuming this fix also.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com]
> *Sent:* Tuesday, September 24, 2019 6:13 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Dian Fu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> I think you do not need the fix in FLINK-10455 which is for Kafka only.
> It's just a similar issue as you met.
>
> As you said, we need to make sure that the operator/UDF spawned threads
> are stopped in the close() method. In this way, we can avoid the thread to
> throw NoClassDefFoundError due to the class loader gets closed.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
> Subramanyam Ramanathan  于2019年9月24
> 日周二 下午8:07写道:
>
> Hi,
>
>
>
> Thank you.
>
> I think the takeaway for us is that we need to make sure that the threads
> are stopped in the close() method.
>
>
>
> With regard to FLINK-10455, I see that the fix versions say : 1.5.6,
> 1.7.0, 1.7.3, 1.8.1, 1.9.0
>
>
>
> However, I’m unable to find 1.7.3 in the downloads page(
> https://flink.apache.org/downloads.html). Is it yet to be released, or
> perhaps I am not looking in the right place ?
>
> We’re currently using 1.7.2. Could you please let me know what is the
> minimal upgrade for me to consume the fix for FLINK-10455 ?
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Dian Fu [mailto:dian0511...@gmail.com]
> *Sent:* Monday, September 23, 2019 1:54 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Zhu Zhu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subbu,
>
>
>
> The issue you encountered is very similar to the issue which has been
> fixed in FLINK-10455 [1]. Could you check if that fix could solve your
> problem? The root cause for that issue is that the method close() has not
> closed all things. After the method "close()" is called, the classloader
> (URLClassloader) will be closed. If there is thread still running after
> "close()" method is called, it may access the classes in user provided
> jars. However, as the URLClassloader has already been closed,
> NoClassDefFoundError will be thrown.
>
>
>
> Regards,
>
> Dian
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10455
>
>
>
> 在 2019年9月23日,下午2:50,Subramanyam Ramanathan <
> subramanyam.ramanat...@microfocus.com> 写道:
>
>
>
> Hi,
>
>
>
> I was able to simulate the issue again and understand the cause a little
> better.
>
>
>
> The issue occurs when :
>
> -    One of the RichMapFunction transformations uses a third party
> library in the open() method that spawns a thread.
>
> -The thread doesn’t get properly closed in the close() method.
>
> -Once the job starts failing, we start seeing a NoClassDefFound
> error from that thread.
>
>
>
> I understand that cleanup should be done in the close() method. However,
> just wanted to know, do we have some kind of a configuration setting  which
> would help us clean up such threads ?
>
> I can attach the code if required.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com ]
> *Sent:* Friday, August 9, 2019 7:43 AM
> *To:* Subramanyam Ramanathan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
&

Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-24 Thread Zhu Zhu
Hi Steven,

As a conclusion, since we will have a meter metric[1] for restarts,
customized restart strategy is not needed in your case.
Is that right?

[1] https://issues.apache.org/jira/browse/FLINK-14164

Thanks,
Zhu Zhu

Steven Wu  于2019年9月25日周三 上午2:30写道:

> Zhu Zhu,
>
> Sorry, I was using different terminology. yes, Flink meter is what I was
> talking about regarding "fullRestarts" for threshold based alerting.
>
> On Mon, Sep 23, 2019 at 7:46 PM Zhu Zhu  wrote:
>
>> Steven,
>>
>> In my mind, Flink counter only stores its accumulated count and reports
>> that value. Are you using an external counter directly?
>> Maybe Flink Meter/MeterView is what you need? It stores the count and
>> calculates the rate. And it will report its "count" as well as "rate" to
>> external metric services.
>>
>> The counter "task_failures" only works if the individual failover
>> strategy is enabled. However, it is not a public interface and is not
>> suggested to use, as the fine grained recovery (region failover) now
>> supersedes it.
>> I've opened a ticket[1] to add a metric to show failovers that respects
>> fine grained recovery.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14164
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月24日周二 上午6:41写道:
>>
>>>
>>> When we setup alert like "fullRestarts > 1" for some rolling window, we
>>> want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
>>> after a first full restart. So alert condition will always be true after
>>> first job restart. If we can apply a derivative to the Gauge value, I guess
>>> alert can probably work. I can explore if that is an option or not.
>>>
>>> Yeah. Understood that "fullRestart" won't increment when fine grained
>>> recovery happened. I think "task_failures" counter already exists in Flink.
>>>
>>>
>>>
>>> On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:
>>>
>>>> Steven,
>>>>
>>>> Thanks for the information. If we can determine this a common issue, we
>>>> can solve it in Flink core.
>>>> To get to that state, I have two questions which need your help:
>>>> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
>>>> Gauge. Does the metric reporter you use report Counter and
>>>> Gauge to external services in different ways? Or anything else can be
>>>> different due to the metric type?
>>>> 2. Is the "number of restarts" what you actually need, rather than
>>>> the "fullRestart" count? If so, I believe we will have such a counter
>>>> metric in 1.10, since the previous "fullRestart" metric value is not the
>>>> number of restarts when grained recovery (feature added 1.9.0) is enabled.
>>>> "fullRestart" reveals how many times entire job graph has been
>>>> restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
>>>> would not be restarted when task failures happen and the "fullRestart"
>>>> value will not increment in such cases.
>>>>
>>>> I'd appreciate if you can help with these questions and we can make
>>>> better decisions for Flink.
>>>>
>>>> Thanks,
>>>> Zhu Zhu
>>>>
>>>> Steven Wu  于2019年9月22日周日 上午3:31写道:
>>>>
>>>>> Zhu Zhu,
>>>>>
>>>>> Flink fullRestart metric is a Gauge, which is not good for alerting
>>>>> on. We publish an equivalent Counter metric for alerting purpose.
>>>>>
>>>>> Thanks,
>>>>> Steven
>>>>>
>>>>> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>>>>>
>>>>>> Thanks Steven for the feedback!
>>>>>> Could you share more information about the metrics you add in you
>>>>>> customized restart strategy?
>>>>>>
>>>>>> Thanks,
>>>>>> Zhu Zhu
>>>>>>
>>>>>> Steven Wu  于2019年9月20日周五 上午7:11写道:
>>>>>>
>>>>>>> We do use config like "restart-strategy:
>>>>>>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>>>>>>> metrics than the Flink provided ones.
>>>>>>>
>>>>>>> On Thu, Sep 19, 2019 at 4:50 

Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-24 Thread Zhu Zhu
Hi Subramanyam,

I checked the commits.
There are 2 fixes in FLINK-10455, only release 1.8.1 and release 1.9.0
contain both of them.

Thanks,
Zhu Zhu

Subramanyam Ramanathan 
于2019年9月24日周二 下午11:02写道:

> Hi Zhu,
>
>
>
> We also use FlinkKafkaProducer(011), hence I felt this fix would also be
> needed for us.
>
>
>
> I agree that the fix for the issue I had originally mentioned would not be
> fixed by this, but I felt that I should be consuming this fix also.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com]
> *Sent:* Tuesday, September 24, 2019 6:13 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Dian Fu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> I think you do not need the fix in FLINK-10455 which is for Kafka only.
> It's just a similar issue as you met.
>
> As you said, we need to make sure that the operator/UDF spawned threads
> are stopped in the close() method. In this way, we can avoid the thread to
> throw NoClassDefFoundError due to the class loader gets closed.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
> Subramanyam Ramanathan  于2019年9月24
> 日周二 下午8:07写道:
>
> Hi,
>
>
>
> Thank you.
>
> I think the takeaway for us is that we need to make sure that the threads
> are stopped in the close() method.
>
>
>
> With regard to FLINK-10455, I see that the fix versions say : 1.5.6,
> 1.7.0, 1.7.3, 1.8.1, 1.9.0
>
>
>
> However, I’m unable to find 1.7.3 in the downloads page(
> https://flink.apache.org/downloads.html). Is it yet to be released, or
> perhaps I am not looking in the right place ?
>
> We’re currently using 1.7.2. Could you please let me know what is the
> minimal upgrade for me to consume the fix for FLINK-10455 ?
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Dian Fu [mailto:dian0511...@gmail.com]
> *Sent:* Monday, September 23, 2019 1:54 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Zhu Zhu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subbu,
>
>
>
> The issue you encountered is very similar to the issue which has been
> fixed in FLINK-10455 [1]. Could you check if that fix could solve your
> problem? The root cause for that issue is that the method close() has not
> closed all things. After the method "close()" is called, the classloader
> (URLClassloader) will be closed. If there is thread still running after
> "close()" method is called, it may access the classes in user provided
> jars. However, as the URLClassloader has already been closed,
> NoClassDefFoundError will be thrown.
>
>
>
> Regards,
>
> Dian
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10455
>
>
>
> 在 2019年9月23日,下午2:50,Subramanyam Ramanathan <
> subramanyam.ramanat...@microfocus.com> 写道:
>
>
>
> Hi,
>
>
>
> I was able to simulate the issue again and understand the cause a little
> better.
>
>
>
> The issue occurs when :
>
> -One of the RichMapFunction transformations uses a third party
> library in the open() method that spawns a thread.
>
> -The thread doesn’t get properly closed in the close() method.
>
> -Once the job starts failing, we start seeing a NoClassDefFound
> error from that thread.
>
>
>
> I understand that cleanup should be done in the close() method. However,
> just wanted to know, do we have some kind of a configuration setting  which
> would help us clean up such threads ?
>
> I can attach the code if required.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com ]
> *Sent:* Friday, August 9, 2019 7:43 AM
> *To:* Subramanyam Ramanathan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> Could you share more information? including:
>
> 1. the URL pattern
>
> 2. the detailed exception and the log around it
>
> 3. the cluster the job is running on, e.g. standalone, yarn, k8s
>
> 4. it's session mode or per job mode
>
>
>
> This information would be helpful to identify the failure cause.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Subramanyam Ramanathan  于2019年8月9
> 日周五 上午1:45写道:
>
>
>
> Hello,
>
>
>
> I'm currently using flink 1.7.2.
>
>
&

Re: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-09-24 Thread Zhu Zhu
Hi Subramanyam,

I think you do not need the fix in FLINK-10455 which is for Kafka only.
It's just a similar issue as you met.
As you said, we need to make sure that the operator/UDF spawned threads are
stopped in the close() method. In this way, we can avoid the thread to
throw NoClassDefFoundError due to the class loader gets closed.

Thanks,
Zhu Zhu


Subramanyam Ramanathan 
于2019年9月24日周二 下午8:07写道:

> Hi,
>
>
>
> Thank you.
>
> I think the takeaway for us is that we need to make sure that the threads
> are stopped in the close() method.
>
>
>
> With regard to FLINK-10455, I see that the fix versions say : 1.5.6,
> 1.7.0, 1.7.3, 1.8.1, 1.9.0
>
>
>
> However, I’m unable to find 1.7.3 in the downloads page(
> https://flink.apache.org/downloads.html). Is it yet to be released, or
> perhaps I am not looking in the right place ?
>
> We’re currently using 1.7.2. Could you please let me know what is the
> minimal upgrade for me to consume the fix for FLINK-10455 ?
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Dian Fu [mailto:dian0511...@gmail.com]
> *Sent:* Monday, September 23, 2019 1:54 PM
> *To:* Subramanyam Ramanathan 
> *Cc:* Zhu Zhu ; user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subbu,
>
>
>
> The issue you encountered is very similar to the issue which has been
> fixed in FLINK-10455 [1]. Could you check if that fix could solve your
> problem? The root cause for that issue is that the method close() has not
> closed all things. After the method "close()" is called, the classloader
> (URLClassloader) will be closed. If there is thread still running after
> "close()" method is called, it may access the classes in user provided
> jars. However, as the URLClassloader has already been closed,
> NoClassDefFoundError will be thrown.
>
>
>
> Regards,
>
> Dian
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-10455
>
>
>
> 在 2019年9月23日,下午2:50,Subramanyam Ramanathan <
> subramanyam.ramanat...@microfocus.com> 写道:
>
>
>
> Hi,
>
>
>
> I was able to simulate the issue again and understand the cause a little
> better.
>
>
>
> The issue occurs when :
>
> -One of the RichMapFunction transformations uses a third party
> library in the open() method that spawns a thread.
>
> -The thread doesn’t get properly closed in the close() method.
>
> -Once the job starts failing, we start seeing a NoClassDefFound
> error from that thread.
>
>
>
> I understand that cleanup should be done in the close() method. However,
> just wanted to know, do we have some kind of a configuration setting  which
> would help us clean up such threads ?
>
> I can attach the code if required.
>
>
>
> Thanks,
>
> Subbu
>
>
>
> *From:* Zhu Zhu [mailto:reed...@gmail.com ]
> *Sent:* Friday, August 9, 2019 7:43 AM
> *To:* Subramanyam Ramanathan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: NoClassDefFoundError in failing-restarting job that uses
> url classloader
>
>
>
> Hi Subramanyam,
>
>
>
> Could you share more information? including:
>
> 1. the URL pattern
>
> 2. the detailed exception and the log around it
>
> 3. the cluster the job is running on, e.g. standalone, yarn, k8s
>
> 4. it's session mode or per job mode
>
>
>
> This information would be helpful to identify the failure cause.
>
>
>
> Thanks,
>
> Zhu Zhu
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Subramanyam Ramanathan  于2019年8月9
> 日周五 上午1:45写道:
>
>
>
> Hello,
>
>
>
> I'm currently using flink 1.7.2.
>
>
>
> I'm trying to run a job that's submitted programmatically using the
> ClusterClient API.
>
>public JobSubmissionResult run(PackagedProgram prog, int
> parallelism)
>
>
>
>
>
> The job makes use of some jars which I add to the packaged program through
> the Packaged constructor, along with the Jar file.
>
>public PackagedProgram(File jarFile, List classpaths, String...
> args)
>
> Normally, This works perfectly and the job runs fine.
>
>
>
> However, if there's an error in the job, and the job goes into failing
> state and when it's continously  trying to restart the job for an hour or
> so, I notice a NoClassDefFoundError for some classes in the jars that I
> load using the URL class loader and the job never recovers after that, even
> if the root cause of the issue was fixed (I had a kafka source/sink in my
> job, and kafka was down temporarily, and was brought up after that).
>
> The jar is still available at the path referenced by the url classloader
> and is not tampered with.
>
>
>
> Could anyone please give me some pointers with regard to the reason why
> this could happen/what I could be missing here/how can I debug further ?
>
>
>
> thanks
>
> Subbu
>
>
>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-23 Thread Zhu Zhu
Steven,

In my mind, Flink counter only stores its accumulated count and reports
that value. Are you using an external counter directly?
Maybe Flink Meter/MeterView is what you need? It stores the count and
calculates the rate. And it will report its "count" as well as "rate" to
external metric services.

The counter "task_failures" only works if the individual failover strategy
is enabled. However, it is not a public interface and is not suggested to
use, as the fine grained recovery (region failover) now supersedes it.
I've opened a ticket[1] to add a metric to show failovers that respects
fine grained recovery.

[1] https://issues.apache.org/jira/browse/FLINK-14164

Thanks,
Zhu Zhu

Steven Wu  于2019年9月24日周二 上午6:41写道:

>
> When we setup alert like "fullRestarts > 1" for some rolling window, we
> want to use counter. if it is a Gauge, "fullRestarts" will never go below 1
> after a first full restart. So alert condition will always be true after
> first job restart. If we can apply a derivative to the Gauge value, I guess
> alert can probably work. I can explore if that is an option or not.
>
> Yeah. Understood that "fullRestart" won't increment when fine grained
> recovery happened. I think "task_failures" counter already exists in Flink.
>
>
>
> On Sun, Sep 22, 2019 at 7:59 PM Zhu Zhu  wrote:
>
>> Steven,
>>
>> Thanks for the information. If we can determine this a common issue, we
>> can solve it in Flink core.
>> To get to that state, I have two questions which need your help:
>> 1. Why is gauge not good for alerting? The metric "fullRestart" is a
>> Gauge. Does the metric reporter you use report Counter and
>> Gauge to external services in different ways? Or anything else can be
>> different due to the metric type?
>> 2. Is the "number of restarts" what you actually need, rather than
>> the "fullRestart" count? If so, I believe we will have such a counter
>> metric in 1.10, since the previous "fullRestart" metric value is not the
>> number of restarts when grained recovery (feature added 1.9.0) is enabled.
>> "fullRestart" reveals how many times entire job graph has been
>> restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
>> would not be restarted when task failures happen and the "fullRestart"
>> value will not increment in such cases.
>>
>> I'd appreciate if you can help with these questions and we can make
>> better decisions for Flink.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月22日周日 上午3:31写道:
>>
>>> Zhu Zhu,
>>>
>>> Flink fullRestart metric is a Gauge, which is not good for alerting on.
>>> We publish an equivalent Counter metric for alerting purpose.
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>>>
>>>> Thanks Steven for the feedback!
>>>> Could you share more information about the metrics you add in you
>>>> customized restart strategy?
>>>>
>>>> Thanks,
>>>> Zhu Zhu
>>>>
>>>> Steven Wu  于2019年9月20日周五 上午7:11写道:
>>>>
>>>>> We do use config like "restart-strategy:
>>>>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>>>>> metrics than the Flink provided ones.
>>>>>
>>>>> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>>>>>
>>>>>> Thanks everyone for the input.
>>>>>>
>>>>>> The RestartStrategy customization is not recognized as a public
>>>>>> interface as it is not explicitly documented.
>>>>>> As it is not used from the feedbacks of this survey, I'll conclude
>>>>>> that we do not need to support customized RestartStrategy for the new
>>>>>> scheduler in Flink 1.10
>>>>>>
>>>>>> Other usages are still supported, including all the strategies and
>>>>>> configuring ways described in
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
>>>>>> .
>>>>>>
>>>>>> Feel free to share in this thread if you has any concern for it.
>>>>>>
>>>>>> Thanks,
>>>>>> Zhu Zhu
>>>>>>
>>>>>> Zhu Zhu  于2019年9月12日周四 下午10:33写道:
>>>>>>
>>>>>>> Thanks Oytun for the 

Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Zhu Zhu
Hi Stephen,

I think disposing static components in the closing stage of a task is
required.
This is because your code(operators/UDFs) is part of the task, namely that
it can only be executed when the task is not disposed.

Thanks,
Zhu Zhu

Stephen Connolly  于2019年9月24日周二 上午2:13写道:

> Currently the best I can see is to make *everything* a Rich... and hook
> into the open and close methods... but feels very ugly.
>
>
>
> On Mon 23 Sep 2019 at 15:45, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> We are using a 3rd party library that allocates some resources in one of
>> our topologies.
>>
>> Is there a listener or something that gets notified when the topology
>> starts / stops running in the Task Manager's JVM?
>>
>> The 3rd party library uses a singleton, so I need to initialize the
>> singleton when the first task is started on the task manager and clear out
>> the singleton when the last task is stopped in order to allow the topology
>> classloader to be unloadable.
>>
>> I had thought it could all be done from the Topology's main method, but
>> after much head-banging we were able to identify that *when run on a
>> distributed cluster* the main method is not invoked to start the topology
>> for each task manager.
>>
> --
> Sent from my phone
>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-22 Thread Zhu Zhu
Steven,

Thanks for the information. If we can determine this a common issue, we can
solve it in Flink core.
To get to that state, I have two questions which need your help:
1. Why is gauge not good for alerting? The metric "fullRestart" is a
Gauge. Does the metric reporter you use report Counter and
Gauge to external services in different ways? Or anything else can be
different due to the metric type?
2. Is the "number of restarts" what you actually need, rather than
the "fullRestart" count? If so, I believe we will have such a counter
metric in 1.10, since the previous "fullRestart" metric value is not the
number of restarts when grained recovery (feature added 1.9.0) is enabled.
"fullRestart" reveals how many times entire job graph has been
restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
would not be restarted when task failures happen and the "fullRestart"
value will not increment in such cases.

I'd appreciate if you can help with these questions and we can make better
decisions for Flink.

Thanks,
Zhu Zhu

Steven Wu  于2019年9月22日周日 上午3:31写道:

> Zhu Zhu,
>
> Flink fullRestart metric is a Gauge, which is not good for alerting on. We
> publish an equivalent Counter metric for alerting purpose.
>
> Thanks,
> Steven
>
> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>
>> Thanks Steven for the feedback!
>> Could you share more information about the metrics you add in you
>> customized restart strategy?
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月20日周五 上午7:11写道:
>>
>>> We do use config like "restart-strategy:
>>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>>> metrics than the Flink provided ones.
>>>
>>> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>>>
>>>> Thanks everyone for the input.
>>>>
>>>> The RestartStrategy customization is not recognized as a public
>>>> interface as it is not explicitly documented.
>>>> As it is not used from the feedbacks of this survey, I'll conclude that
>>>> we do not need to support customized RestartStrategy for the new scheduler
>>>> in Flink 1.10
>>>>
>>>> Other usages are still supported, including all the strategies and
>>>> configuring ways described in
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
>>>> .
>>>>
>>>> Feel free to share in this thread if you has any concern for it.
>>>>
>>>> Thanks,
>>>> Zhu Zhu
>>>>
>>>> Zhu Zhu  于2019年9月12日周四 下午10:33写道:
>>>>
>>>>> Thanks Oytun for the reply!
>>>>>
>>>>> Sorry for not have stated it clearly. When saying "customized
>>>>> RestartStrategy", we mean that users implement an
>>>>> *org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
>>>>> themselves and use it by configuring like "restart-strategy:
>>>>> org.foobar.MyRestartStrategyFactoryFactory".
>>>>>
>>>>> The usage of restart strategies you mentioned will keep working with
>>>>> the new scheduler.
>>>>>
>>>>> Thanks,
>>>>> Zhu Zhu
>>>>>
>>>>> Oytun Tez  于2019年9月12日周四 下午10:05写道:
>>>>>
>>>>>> Hi Zhu,
>>>>>>
>>>>>> We are using custom restart strategy like this:
>>>>>>
>>>>>> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
>>>>>> Time.minutes(10)));
>>>>>>
>>>>>>
>>>>>> ---
>>>>>> Oytun Tez
>>>>>>
>>>>>> *M O T A W O R D*
>>>>>> The World's Fastest Human Translation Platform.
>>>>>> oy...@motaword.com — www.motaword.com
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I wanted to reach out to you and ask how many of you are using a
>>>>>>> customized RestartStrategy[1] in production jobs.
>>>>>>>
>>>>>>> We are currently developing the new Flink scheduler[2] which
>>>>>>> interacts with restart strategies in a different way. We have to 
>>>>>>> re-design
>>>>>>> the interfaces for the new restart strategies (so called
>>>>>>> RestartBackoffTimeStrategy). Existing customized RestartStrategy will 
>>>>>>> not
>>>>>>> work any more with the new scheduler.
>>>>>>>
>>>>>>> We want to know whether we should keep the way
>>>>>>> to customized RestartBackoffTimeStrategy so that existing customized
>>>>>>> RestartStrategy can be migrated.
>>>>>>>
>>>>>>> I'd appreciate if you can share the status if you are
>>>>>>> using customized RestartStrategy. That will be valuable for use to make
>>>>>>> decisions.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-10429
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Zhu Zhu
>>>>>>>
>>>>>>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-19 Thread Zhu Zhu
Thanks Steven for the feedback!
Could you share more information about the metrics you add in you
customized restart strategy?

Thanks,
Zhu Zhu

Steven Wu  于2019年9月20日周五 上午7:11写道:

> We do use config like "restart-strategy:
> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
> metrics than the Flink provided ones.
>
> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>
>> Thanks everyone for the input.
>>
>> The RestartStrategy customization is not recognized as a public interface
>> as it is not explicitly documented.
>> As it is not used from the feedbacks of this survey, I'll conclude that
>> we do not need to support customized RestartStrategy for the new scheduler
>> in Flink 1.10
>>
>> Other usages are still supported, including all the strategies and
>> configuring ways described in
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
>> .
>>
>> Feel free to share in this thread if you has any concern for it.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Zhu Zhu  于2019年9月12日周四 下午10:33写道:
>>
>>> Thanks Oytun for the reply!
>>>
>>> Sorry for not have stated it clearly. When saying "customized
>>> RestartStrategy", we mean that users implement an
>>> *org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
>>> themselves and use it by configuring like "restart-strategy:
>>> org.foobar.MyRestartStrategyFactoryFactory".
>>>
>>> The usage of restart strategies you mentioned will keep working with the
>>> new scheduler.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Oytun Tez  于2019年9月12日周四 下午10:05写道:
>>>
>>>> Hi Zhu,
>>>>
>>>> We are using custom restart strategy like this:
>>>>
>>>> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
>>>> Time.minutes(10)));
>>>>
>>>>
>>>> ---
>>>> Oytun Tez
>>>>
>>>> *M O T A W O R D*
>>>> The World's Fastest Human Translation Platform.
>>>> oy...@motaword.com — www.motaword.com
>>>>
>>>>
>>>> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I wanted to reach out to you and ask how many of you are using a
>>>>> customized RestartStrategy[1] in production jobs.
>>>>>
>>>>> We are currently developing the new Flink scheduler[2] which interacts
>>>>> with restart strategies in a different way. We have to re-design the
>>>>> interfaces for the new restart strategies (so called
>>>>> RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
>>>>> work any more with the new scheduler.
>>>>>
>>>>> We want to know whether we should keep the way
>>>>> to customized RestartBackoffTimeStrategy so that existing customized
>>>>> RestartStrategy can be migrated.
>>>>>
>>>>> I'd appreciate if you can share the status if you are using customized
>>>>> RestartStrategy. That will be valuable for use to make decisions.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
>>>>> [2] https://issues.apache.org/jira/browse/FLINK-10429
>>>>>
>>>>> Thanks,
>>>>> Zhu Zhu
>>>>>
>>>>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-19 Thread Zhu Zhu
Thanks everyone for the input.

The RestartStrategy customization is not recognized as a public interface
as it is not explicitly documented.
As it is not used from the feedbacks of this survey, I'll conclude that we
do not need to support customized RestartStrategy for the new scheduler in
Flink 1.10

Other usages are still supported, including all the strategies and
configuring ways described in
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
.

Feel free to share in this thread if you has any concern for it.

Thanks,
Zhu Zhu

Zhu Zhu  于2019年9月12日周四 下午10:33写道:

> Thanks Oytun for the reply!
>
> Sorry for not have stated it clearly. When saying "customized
> RestartStrategy", we mean that users implement an
> *org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
> themselves and use it by configuring like "restart-strategy:
> org.foobar.MyRestartStrategyFactoryFactory".
>
> The usage of restart strategies you mentioned will keep working with the
> new scheduler.
>
> Thanks,
> Zhu Zhu
>
> Oytun Tez  于2019年9月12日周四 下午10:05写道:
>
>> Hi Zhu,
>>
>> We are using custom restart strategy like this:
>>
>> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
>> Time.minutes(10)));
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>>
>>> Hi everyone,
>>>
>>> I wanted to reach out to you and ask how many of you are using a
>>> customized RestartStrategy[1] in production jobs.
>>>
>>> We are currently developing the new Flink scheduler[2] which interacts
>>> with restart strategies in a different way. We have to re-design the
>>> interfaces for the new restart strategies (so called
>>> RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
>>> work any more with the new scheduler.
>>>
>>> We want to know whether we should keep the way
>>> to customized RestartBackoffTimeStrategy so that existing customized
>>> RestartStrategy can be migrated.
>>>
>>> I'd appreciate if you can share the status if you are using customized
>>> RestartStrategy. That will be valuable for use to make decisions.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
>>> [2] https://issues.apache.org/jira/browse/FLINK-10429
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>


Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-12 Thread Zhu Zhu
Thanks Oytun for the reply!

Sorry for not have stated it clearly. When saying "customized
RestartStrategy", we mean that users implement an
*org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
themselves and use it by configuring like "restart-strategy:
org.foobar.MyRestartStrategyFactoryFactory".

The usage of restart strategies you mentioned will keep working with the
new scheduler.

Thanks,
Zhu Zhu

Oytun Tez  于2019年9月12日周四 下午10:05写道:

> Hi Zhu,
>
> We are using custom restart strategy like this:
>
> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
> Time.minutes(10)));
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>
>> Hi everyone,
>>
>> I wanted to reach out to you and ask how many of you are using a
>> customized RestartStrategy[1] in production jobs.
>>
>> We are currently developing the new Flink scheduler[2] which interacts
>> with restart strategies in a different way. We have to re-design the
>> interfaces for the new restart strategies (so called
>> RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
>> work any more with the new scheduler.
>>
>> We want to know whether we should keep the way
>> to customized RestartBackoffTimeStrategy so that existing customized
>> RestartStrategy can be migrated.
>>
>> I'd appreciate if you can share the status if you are using customized
>> RestartStrategy. That will be valuable for use to make decisions.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
>> [2] https://issues.apache.org/jira/browse/FLINK-10429
>>
>> Thanks,
>> Zhu Zhu
>>
>


[SURVEY] How many people are using customized RestartStrategy(s)

2019-09-12 Thread Zhu Zhu
Hi everyone,

I wanted to reach out to you and ask how many of you are using a customized
RestartStrategy[1] in production jobs.

We are currently developing the new Flink scheduler[2] which interacts
with restart strategies in a different way. We have to re-design the
interfaces for the new restart strategies (so called
RestartBackoffTimeStrategy). Existing customized RestartStrategy will not
work any more with the new scheduler.

We want to know whether we should keep the way
to customized RestartBackoffTimeStrategy so that existing customized
RestartStrategy can be migrated.

I'd appreciate if you can share the status if you are using customized
RestartStrategy. That will be valuable for use to make decisions.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
[2] https://issues.apache.org/jira/browse/FLINK-10429

Thanks,
Zhu Zhu


Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread Zhu Zhu
Congratulations Zili!

Thanks,
Zhu Zhu

Terry Wang  于2019年9月11日周三 下午5:34写道:

> Congratulations!
>
> Best,
> Terry Wang
>
>
>
> 在 2019年9月11日,下午5:28,Dian Fu  写道:
>
> Congratulations!
>
> 在 2019年9月11日,下午5:26,Jeff Zhang  写道:
>
> Congratulations Zili Chen!
>
> Wesley Peng  于2019年9月11日周三 下午5:25写道:
>
>> Hi
>>
>> on 2019/9/11 17:22, Till Rohrmann wrote:
>> > I'm very happy to announce that Zili Chen (some of you might also know
>> > him as Tison Kun) accepted the offer of the Flink PMC to become a
>> > committer of the Flink project.
>>
>> Congratulations Zili Chen.
>>
>> regards.
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>
>


Re: Post-processing batch JobExecutionResult

2019-09-06 Thread Zhu Zhu
Hi spoganshev,

The *OptimizerPlanEnvironment* is for creating optimized plan only, as
described in the javadoc
"An {@link ExecutionEnvironment} that never executes a job but only creates
the optimized plan."
It execute() is invoked with some internal handling so that it only
generates optimized plan and do not actually submit a job.
Some other execution environment will execute the job instead.

Not sure how you created your ExecutionEnvironment?
Usually for DataSet jobs, it should be created in the way as below.
"final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();"

Thanks,
Zhu Zhu

spoganshev  于2019年9月6日周五 下午11:39写道:

> Due to OptimizerPlanEnvironment.execute() throwing exception on the last
> line
> there is not way to post-process batch job execution result, like:
>
> JobExecutionResult r = env.execute(); // execute batch job
> analyzeResult(r); // this will never get executed due to plan optimization
>
>
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L54
>
> Is there any way to allow such post-processing in batch jobs?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: error in LocalStreamEnvironment

2019-09-05 Thread Zhu Zhu
Checked the demo and find that its using a quite outdated flink
version(0.10.0).
And you are trying to run it with Flink 1.7.2.
That's why the NoClassDefFound error happens.

I'd suggest you try examples in current Flink repo.

references:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/examples/
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/local_setup.html

Thanks,
Zhu Zhu

alaa  于2019年9月5日周四 下午3:10写道:

> thank for your reply but Unfortunately this solution is not suitable
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1965/Screenshot_from_2019-09-05_09-09-12.png>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: error in LocalStreamEnvironment

2019-09-05 Thread Zhu Zhu
Hi alaa,

I think this demo is targeting for running in IDE only.
The demo runs in *DemoStreamEnvironment* which extends
*LocalStreamEnvironment*.
And *LocalStreamEnvironment* cannot be used when submitting a program
through a client, as the exception says.

If you want to run this demo with flink client, I think you can change
"val env: StreamExecutionEnvironment = DemoStreamEnvironment.env" to be "val
env = StreamExecutionEnvironment.getExecutionEnvironment"
in the demo code like TotalArrivalCount.scala.

Thanks,
Zhu Zhu



alaa  于2019年9月4日周三 下午5:18写道:

> Hallo
>
>
> I try to run this example from GitHub
> https://github.com/dataArtisans/flink-streaming-demo but there was a
> problem
> when I try to run it on Flink to see the result on web dashboard
>
> Although the code  working properly on my Intellj
>
> So, how can I solve this problem ?
>
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1965/Screenshot_from_2019-09-04_11-12-32.png>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Zhu Zhu
1s looks good to me.
And I think the conclusion that when a user should override the delay is
worth to be documented.

Thanks,
Zhu Zhu

Steven Wu  于2019年9月3日周二 上午4:42写道:

> 1s sounds a good tradeoff to me.
>
> On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann  wrote:
>
>> Thanks a lot for all your feedback. I see there is a slight tendency
>> towards having a non zero default delay so far.
>>
>> However, Yu has brought up some valid points. Maybe I can shed some light
>> on a).
>>
>> Before FLINK-9158 we set the default delay to 10s because Flink did not
>> support queued scheduling which meant that if one slot was missing/still
>> being occupied, then Flink would fail right away with
>> a NoResourceAvailableException. In order to prevent this we added the
>> delay. This also covered the case when the job was failing because of an
>> overloaded external system.
>>
>> When we finished FLIP-6, we thought that we could improve the user
>> experience by decreasing the default delay to 0s because all Flink related
>> problems (slot still occupied, slot missing because of reconnecting TM)
>> could be handled by the default slot request time out which allowed the
>> slots to become ready after the scheduling was kicked off. However, we did
>> not properly take the case of overloaded external systems into account.
>>
>> For b) I agree that any default value should be properly documented. This
>> was clearly an oversight when FLINK-9158 has been merged. Moreover, I
>> believe that there won't be the solve it all default value. There are
>> always cases where one needs to adapt it to ones needs. But this is ok. The
>> goal should be to find the default value which works for most cases.
>>
>> So maybe the middle ground between 10s and 0s could be a solution.
>> Setting the default restart delay to 1s should prevent restart storms
>> caused by overloaded external systems and still be fast enough to not slow
>> down recoveries noticeably in most cases. If one needs a super fast
>> recovery, then one should set the delay value to 0s. If one requires a
>> longer delay because of a particular infrastructure, then one needs to
>> change the value too. What do you think?
>>
>> Cheers,
>> Till
>>
>> On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:
>>
>>> -1 on increasing the default delay to none zero, with below reasons:
>>>
>>> a) I could see some concerns about setting the delay to zero in the very
>>> original JIRA (FLINK-2993
>>> <https://issues.apache.org/jira/browse/FLINK-2993>) but later on in
>>> FLINK-9158 <https://issues.apache.org/jira/browse/FLINK-9158> we still
>>> decided to make the change, so I'm wondering whether the decision also came
>>> from any customer requirement? If so, how could we judge whether one
>>> requirement override the other?
>>>
>>> b) There could be valid reasons for both default values depending on
>>> different use cases, as well as relative work around (like based on latest
>>> policy, setting the config manually to 10s could resolve the problem
>>> mentioned), and from former replies to this thread we could see users have
>>> already taken actions. Changing it back to non-zero again won't affect such
>>> users but might cause surprises to those depending on 0 as default.
>>>
>>> Last but not least, no matter what decision we make this time, I'd
>>> suggest to make it final and document in our release note explicitly.
>>> Checking the 1.5.0 release note [1] [2] it seems we didn't mention about
>>> the change on default restart delay and we'd better learn from it this
>>> time. Thanks.
>>>
>>> [1]
>>> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:
>>>
>>>> +1 on what Zhu Zhu said.
>>>>
>>>> We also override the default to 10 s.
>>>>
>>>> On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:
>>>>
>>>>> In our production, we usually override the restart delay to be 10 s.
>>>>> We once encountered cases that external services are overwhelmed by
>>>>> reconnections from frequent restarted tasks.
>>>>> As a safer though not optimized option, a default delay larger than 0
>>>>> s is bett

Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-09-01 Thread Zhu Zhu
Hi Elkhan,

>>Regarding "One optimization that we take is letting yarn to reuse the
flink-dist jar which was localized when running previous jobs."
>>We are intending to use Flink Real-time pipeline for Replay from
Hive/HDFS (from offline source), to have 1 single pipeline for both batch
and real-time. So for batch Flink job, the ?>>containers will be released
once the job is done.
>>I guess your job is real-time flink, so  you can share the  jars from
already long-running jobs.

This optimization is conducted by making flink dist jar a public
distributed cache of YARN.
In this way, the localized dist jar can be shared by different YARN
applications and it will not be removed when the YARN application which
localized it terminates.
This requires some changes in Flink though.
We will open a ISSUE to contribute this optimization to the community.

Thanks,
Zhu Zhu

SHI Xiaogang  于2019年8月31日周六 下午12:57写道:

> Hi Dadashov,
>
> You may have a look at method YarnResourceManager#onContainersAllocated
> which will launch containers (via NMClient#startContainer) after containers
> are allocated.
> The launching is performed in the main thread of YarnResourceManager and
> the launching is synchronous/blocking. Consequently, the containers will be
> launched one by one.
>
> Regards,
> Xiaogang
>
> Elkhan Dadashov  于2019年8月31日周六 上午2:37写道:
>
>> Thanks  everyone for valuable input and sharing  your experience for
>> tackling the issue.
>>
>> Regarding suggestions :
>> - We provision some common jars in all cluster nodes  *-->*  but this
>> requires dependence on Infra Team schedule for handling common jars/updating
>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
>> size),  did not improve much. Only 100 containers could started in time.
>> but then receiving :
>>
>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>> start container.
>> This token is expired. current time is 1566422713305 found 1566422560552
>> Note: System times on machines may be out of sync. Check system time and 
>> time zones.
>>
>>
>> - It would be nice to see FLINK-13184
>> <https://issues.apache.org/jira/browse/FLINK-13184> , but expected
>> version that will get in is 1.10
>> - Increase replication factor --> It would be nice to have Flink conf for
>> setting replication factor for only Fink job jars, but not the output. It
>> is also challenging to set a replication for yet non-existing directory,
>> the new files will have default replication factor. Will explore HDFS cache
>> option.
>>
>> Maybe another option can be:
>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
>> jars from already started TaskManagers  in P2P fashion, not to have a
>> blocker on HDFS replication.
>>
>> Spark job without any tuning exact same size jar with 800 executors, can
>> start without any issue at the same cluster in less than a minute.
>>
>> *Further questions:*
>>
>> *@ SHI Xiaogang > :*
>>
>> I see that all 800 requests are sent concurrently :
>>
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>> container with resources . Number pending requests
>> 793.
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>>
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>> container with resources . Number pending requests
>> 794.
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
>> ...
>>
>> Can you please elaborate the part  "As containers are launched and
>> stopped one after another" ? Any pointer to class/method in Flink?
>>
>> *@ Zhu Zhu > *:
>>
>> Regarding "One optimization that we take is letting yarn to reuse the
>> flink-dis

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-08-30 Thread Zhu Zhu
In our production, we usually override the restart delay to be 10 s.
We once encountered cases that external services are overwhelmed by
reconnections from frequent restarted tasks.
As a safer though not optimized option, a default delay larger than 0 s is
better in my opinion.


未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:

> Hi,
>
>
> I thinks it's better to increase the default value. +1
>
>
> Best.
>
>
>
>
> -- 原始邮件 --
> 发件人: "Till Rohrmann";
> 发送时间: 2019年8月30日(星期五) 晚上10:07
> 收件人: "dev"; "user";
> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>
>
>
> Hi everyone,
>
> I wanted to reach out to you and ask whether decreasing the default delay
> to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
> user reported that he would like to increase the default value because it
> can cause restart storms in case of systematic faults [2].
>
> The downside of increasing the default delay would be a slightly increased
> restart time if this config option is not explicitly set.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9158
> [2] https://issues.apache.org/jira/browse/FLINK-11218
>
> Cheers,
> Till


Re: best practices on getting flink job logs from Hadoop history server?

2019-08-30 Thread Zhu Zhu
Hi Yu,

Regarding #2,
Currently we search task deployment log in JM log, which contains info of
the container and machine the task deploys to.

Regarding #3,
You can find the application logs aggregated by machines on DFS, this path
of which relies on your YARN config.
Each log may still include multiple TM logs. However it can be much smaller
than the "yarn logs ..." generated log.

Thanks,
Zhu Zhu

Yu Yang  于2019年8月30日周五 下午3:58写道:

> Hi,
>
> We run flink jobs through yarn on hadoop clusters. One challenge that we
> are facing is to simplify flink job log access.
>
> The flink job logs can be accessible using "yarn logs $application_id".
> That approach has a few limitations:
>
>1. It is not straightforward to find yarn application id based on
>flink job id.
>2. It is difficult to find the corresponding container id for the
>flink sub tasks.
>3. For jobs that have many tasks, it is inefficient to use "yarn logs
>..."  as it mixes logs from all task managers.
>
> Any suggestions on the best practice to get logs for completed flink job
> that run on yarn?
>
> Regards,
> -Yu
>
>
>


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Zhu Zhu
One optimization that we take is letting yarn to reuse the flink-dist jar
which was localized when running previous jobs.

Thanks,
Zhu Zhu

Jörn Franke  于2019年8月30日周五 下午4:02写道:

> Increase replication factor and/or use HDFS cache
> https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
> Try to reduce the size of the Jar, eg the Flink libraries do not need to
> be included.
>
> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov  >:
>
> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>   at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>   at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>   at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>   at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>


Re: Problem upgrading HA set up from 1.7.2 to 1.9.0

2019-08-28 Thread Zhu Zhu
Hi Steven,

The root cause is that the *InputDependencyConstraint* is null.
Did you ever invoke
*ExecutionConfig#setDefaultInputDependencyConstraint(null)* in your job
code?

If not, this should not happen according to current code paths, as the
*InputDependencyConstraint* is initially assigned with a non-null value.
In that case, would you check if the docker image is storing a
pre-generated legacy(1.7.2) JobGraph which is not compatible with Flink 1.9?

Thanks,
Zhu Zhu

Steven Nelson  于2019年8月28日周三 下午11:23写道:

> I am trying to update a cluster running in HA mode from 1.7.2 to 1.9.0. I
> am attempting to just update the docker images to the new ones and restart
> the cluster. Is this something that is supported? or do I need to destroy
> the HA setup and build the cluster from scratch?
>
> Here is the error I get.
> 2019-08-28T15:19:21.287+ ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
> occurred in the cluster entrypoint.
> org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
> leadership with session id 2f8f7919-a81b-4529-ad57-9789dbf07707.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$30(Dispatcher.java:915)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$WaitingConjunctFuture.handleCompletedFuture(FutureUtils.java:691)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Fork

Re: Using shell environment variables

2019-08-24 Thread Zhu Zhu
Hi Abhishek,

You need to export the environment variables on all the worker machines(not
the machine to submit the job).

Alternatively, if you are submitting the job to a yarn cluster, you can use
flink conf prefix "containerized.taskmanager.env." to add environment
variables to Flink's task manager process.
For example for passing LD_LIBRARY_PATH as an env variable to the workers,
set: containerized.taskmanager.env.LD_LIBRARY_PATH: "/usr/lib/native" in
the flink-conf.yaml.

Thanks,
Zhu Zhu

Abhishek Jain  于2019年8月25日周日 上午2:48写道:

> Hi Miki,
> Thanks for your reply. ParameterTool will only help in making the value
> accessible through ParameterTool.get(). However, I need a way of accessing
> the value using "System.getenv" since the underlying library uses it so.
>
> On Sat, 24 Aug 2019 at 23:04, miki haiat  wrote:
>
>> Did you register your system environment parameter ?
>>
>> You can find here several ways to use configuration data [1]
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html
>>
>>
>> On Sat, Aug 24, 2019, 20:26 Abhishek Jain  wrote:
>>
>>> Hi!
>>>
>>> I am using a library that depends on a certain environment variable set
>>> (mandatorily). Now, I've exported this variable in my environment but
>>> somehow it's not being read by the task manager. Following is the exception
>>> I get when I try to run the job:
>>>
>>> Caused by: com.example.MyCustomException: Env token is null
>>> at com.example.AerospikeSink.open(AerospikeSink.java:47)
>>> at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Here's the code that throws this exception:
>>>
>>> @Override
>>> public void open(Configuration config) throws Exception {
>>> if (System.getenv("API_TOKEN") == null) {
>>> throw new MyCustomException("Env token is null");
>>> }
>>> }
>>>
>>> My question: Is there an alternative to System.getenv() that I can use
>>> to access environment variables inside of flink task?
>>>
>>> ( P.S. I've only copied relevant code snippet to avoid confusion. I do
>>> intend to use API_TOKEN later on. )
>>>
>>> --
>>> Warm Regards,
>>> Abhishek Jain
>>>
>>
>
> --
> Warm Regards,
> Abhishek Jain
>


Re: Problem with Flink on Yarn

2019-08-23 Thread Zhu Zhu
Hi Juan,

Have you tried Flink release built with Hadoop 2.7 or later version?
If you are using Flink 1.8/1.9, it should be Pre-bundled Hadoop 2.7+ jar
which can be found in the Flink download page.

I think YARN-3103 is about AMRMClientImp.class and it is in the flink
shaded hadoop jar.

Thanks,
Zhu Zhu

Juan Gentile  于2019年8月23日周五 下午7:48写道:

> Hello!
>
>
>
> We are running Flink on Yarn and we are currently getting the following
> error:
>
>
>
> *2019-08-23 06:11:01,534 WARN
> org.apache.hadoop.security.UserGroupInformation   -
> PriviledgedActionException as: (auth:KERBEROS)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>
> *2019-08-23 06:11:01,535 WARN
> org.apache.hadoop.ipc.Client  - Exception
> encountered while connecting to the server :
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>
> *2019-08-23 06:11:01,536 WARN
> org.apache.hadoop.security.UserGroupInformation   -
> PriviledgedActionException as:  (auth:KERBEROS)
> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>
> *2019-08-23 06:11:01,581 WARN
> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
> while invoking ApplicationMasterProtocolPBClientImpl.allocate over rm0. Not
> retrying because Invalid or Cancelled Token*
>
> *org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
> AMRMToken from appattempt_1564713228886_5299648_01*
>
> *at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)*
>
> *at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)*
>
> *at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
>
> *at java.lang.reflect.Constructor.newInstance(Constructor.java:423)*
>
> *at
> org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)*
>
> *at
> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)*
>
> *at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)*
>
> *at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)*
>
> *at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
> *at java.lang.reflect.Method.invoke(Method.java:498)*
>
> *at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:288)*
>
> *at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:206)*
>
> *at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:188)*
>
> *at com.sun.proxy.$Proxy26.allocate(Unknown Source)*
>
> *at
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:277)*
>
> *at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:224)*
>
> *Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>
> *at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>
> *at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>
> *at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)*
>
> *at com.sun.proxy.$Proxy25.allocate(Unknown Source)*
>
> *at
> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)*
>
> *... 9 more*
>
>
>
> The Flink cluster runs ok for a while but then after a day we get this
> error again. We haven’t made changes to our code so that’s why it’s hard to
> understand why all of a sudden we started to see this.
>
>
>
> We found this issue reported on Yarn
> https://issues.apache.org/jira/browse/YARN-3103 but our version of Yarn
> already has that fix.
>
>
>
> Any help will be appreciated.
>
>
>
> Thank you,
>
> Juan
>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Zhu Zhu
Thanks Gordon for the update.
Congratulations that we have Flink 1.9.0 released!
Thanks to all the contributors.

Thanks,
Zhu Zhu


Eliza  于2019年8月22日周四 下午8:10写道:

>
>
> On 2019/8/22 星期四 下午 8:03, Tzu-Li (Gordon) Tai wrote:
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.9.0, which is the latest major release.
>
> Congratulations and thanks~
>
> regards.
>


Re: Externalized checkpoints

2019-08-21 Thread Zhu Zhu
Hi Vishwas,

You can configure "state.checkpoints.num-retained" to specify the max
checkpoints to retain.
By default it is 1.

Thanks,
Zhu Zhu

Vishwas Siravara  于2019年8月22日周四 上午6:48写道:

> I am also using exactly once checkpointing mode, I have a kafka source and
> sink so both support transactions which should allow for exactly once
> processing. Is this the reason why there is only one checkpoint retained ?
>
> Thanks,
> Vishwas
>
> On Wed, Aug 21, 2019 at 5:26 PM Vishwas Siravara 
> wrote:
>
>> Hi peeps,
>> I am externalizing checkpoints in S3 for my flink job and I retain them
>> on cancellation. However when I look into my S3 bucket where the
>> checkpoints are stored there is only 1 checkpoint at any point in time . Is
>> this the default behavior of flink where older checkpoints are deleted when
>> the current checkpoint completes ? Here are a few screenshots. What are
>> your thoughts on restoring an older state which is not the previous state ?
>>
>> List contents of bucket at time 0
>>
>> Object Name: 
>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/6af4f345-49e0-4ae1-baae-1f7c4d71ebf4Last
>>  modified time : Wed Aug 21 22:17:23 GMT 2019
>> Object Name: 
>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-6/_metadataLast modified 
>> time : Wed Aug 21 22:17:24 GMT 2019
>>
>> List contents of bucket at time 1
>>
>> Printing last modified times
>> Object Name: 
>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/7cf17042-7790-4909-9252-73511d93f518Last
>>  modified time : Wed Aug 21 22:23:24 GMT 2019
>> Object Name: 
>> checkpoints/fb9fea316bf2d530a6fc54ea107d66d4/chk-12/_metadataLast modified 
>> time : Wed Aug 21 22:23:24 GMT 2019
>>
>> Thanks,
>>
>> Vishwas
>>
>>


Re: TaskManager not connecting to ResourceManager in HA mode

2019-08-21 Thread Zhu Zhu
Hi Aleksandar,

The resource manager address is retrieved from the HA services.
Would you check whether your customized HA services is returning the right
LeaderRetrievalService and whether the LeaderRetrievalService is really
retrieving the right leader's address?
Or is it possible that the stored resource manager address in HA is
replaced by jobmanager address in any case?

Thanks,
Zhu Zhu

Aleksandar Mastilovic  于2019年8月22日周四 上午8:16写道:

> Hi all,
>
> I’m experimenting with using my own implementation of HA services instead
> of ZooKeeper that would persist JobManager information on a Kubernetes
> volume instead of in ZooKeeper.
>
> I’ve set the high-availability option in flink-conf.yaml to the FQN of my
> factory class, and started the docker ensemble as I usually do (i.e. with
> no special “cluster” arguments or scripts.)
>
> What’s happening now is that TaskManager is unable to connect to
> ResourceManager, because it seems it’s trying to use the /user/jobmanager
> path instead of /user/resourcemanager.
>
> Here’s what I found in the logs:
>
>
> jobmanager_1| 2019-08-22 00:05:00,963 INFO  akka.remote.Remoting
>- Remoting started; listening on
> addresses :[akka.tcp://flink@jobmanager:6123]
> jobmanager_1| 2019-08-22 00:05:00,975 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor
> system started at akka.tcp://flink@jobmanager:6123
>
> jobmanager_1| 2019-08-22 00:05:02,380 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at
> akka://flink/user/resourcemanager .
>
> jobmanager_1| 2019-08-22 00:05:03,138 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService  - Starting
> RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher
> at akka://flink/user/dispatcher .
>
> jobmanager_1| 2019-08-22 00:05:03,211 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> ResourceManager akka.tcp://flink@jobmanager:6123/user/resourcemanager was
> granted leadership with fencing token 
>
> jobmanager_1| 2019-08-22 00:05:03,292 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Dispatcher
> akka.tcp://flink@jobmanager:6123/user/dispatcher was granted leadership
> with fencing token ----
>
> taskmanager_1   | 2019-08-22 00:05:03,713 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
> to ResourceManager
> akka.tcp://flink@jobmanager:6123/user/jobmanager()
> .
> taskmanager_1   | 2019-08-22 00:05:04,137 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor- Could not
> resolve ResourceManager address
> akka.tcp://flink@jobmanager:6123/user/jobmanager, retrying in 1 ms:
> Could not connect to rpc endpoint under address
> akka.tcp://flink@jobmanager:6123/user/jobmanager..
>
> Is this a known bug? I’d appreciate any help I can get.
>
> Thanks,
> Aleksandar Mastilovic
>


Re: Customize file assignments logic in flink application

2019-08-16 Thread Zhu Zhu
Hi Lu,

I think it's OK to choose any way as long as it works.
Though I've no idea how you would extend SplittableIterator in your case.
The underlying is ParallelIteratorInputFormat and its processing is not
matched to a certain subtask index.

Thanks,
Zhu Zhu

Lu Niu  于2019年8月16日周五 上午12:48写道:

> Hi, Zhu
>
> Thanks for reply! I found using SplittableIterator is also doable to some
> extent. How to choose between these two?
>
> Best
> Lu
>
> On Wed, Aug 14, 2019 at 8:02 PM Zhu Zhu  wrote:
>
>> Hi Lu,
>>
>> Implementing your own *InputFormat* and *InputSplitAssigner*(which has
>> the interface "InputSplit getNextInputSplit(String host, int
>> taskId)") created by it should work if you want to assign InputSplit to
>> tasks according to the task index and file name patterns.
>> To assign 2 *InputSplit*s in one request, you can implement a new
>> *InputSplit* which wraps multiple *FileInputSplit*s. And you may need to
>> define in your *InputFormat* on how to process the new *InputSplit*.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Lu Niu  于2019年8月15日周四 上午12:26写道:
>>
>>> Hi,
>>>
>>> I have a data set backed by a directory of files in which file names are
>>> meaningful.
>>>
>>> folder1
>>>+-file01
>>>+-file02
>>>+-file03
>>>+-file04
>>>
>>> I want to control the file assignments in my flink application. For
>>> example, when parallelism is 2, worker 1 get file01 and file02 to read and
>>> worker2 get 3 and 4. Also each worker get 2 files all at once because
>>> reading requires jumping back and forth between those two files.
>>>
>>> What's the best way to do this? It seems like FileInputFormat is not
>>> extensible in this case.
>>>
>>> Best
>>> Lu
>>>
>>>
>>>


Re: How to load udf jars in flink program

2019-08-15 Thread Zhu Zhu
Hi Jiangang,

Does "flink run -j jarpath ..." work for you?
If that jar id deployed to the same path on each worker machine, you can
try "flink run -C classpath ..." as well.

Thanks,
Zhu Zhu

刘建刚  于2019年8月15日周四 下午5:31写道:

>   We are using per-job to load udf jar when start job. Our jar file is
> in another path but not flink's lib path. In the main function, we use
> classLoader to load the jar file by the jar path. But it reports the
> following error when job starts running.
>   If the jar file is in lib, everything is ok. But our udf jar file is
> managed in a special path. How can I load udf jars in flink program with
> only giving the jar path?
>
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
>   at 
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:723)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 1: 
> Cannot determine simple type name "com"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11877)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6758)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6519)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
>   at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
>   at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6498)
>   at org.codehaus.janino.UnitCompiler.access$14000(UnitCompiler.java:218)
>   at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6405)
>   at 
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6400)
>   at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3983)
>   at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6400)
>   at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6393)
>   at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3982)
>   at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6393)
>   at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:218)
>   at org.codehaus.janino.UnitCompiler$25.getType(UnitCompiler.java:8206)
>   at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6798)
>   at org.codehaus.janino.UnitCompiler.access$14500(UnitCompiler.java:218)
>   at 
> org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6423)
>   at 
> org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6418)
>   at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4365)
>   at 
> org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6418)
>   at 
> org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6414)
>   at org.codehaus.janino.Java$Lvalue.accept(Java.java:4203)
>   at 
> org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6414)
>   at 
> org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6393)
>   at org.codehaus.janino.Java$Rvalue.accept(Java.java:4171)
>   at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6393)
>   at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6780)
>   at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:218)
>   at 
> org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6421)
>   at 
> org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6418)
>   at org.codehaus.janino.Java$A

Re: Customize file assignments logic in flink application

2019-08-14 Thread Zhu Zhu
Hi Lu,

Implementing your own *InputFormat* and *InputSplitAssigner*(which has the
interface "InputSplit getNextInputSplit(String host, int taskId)") created
by it should work if you want to assign InputSplit to tasks according to
the task index and file name patterns.
To assign 2 *InputSplit*s in one request, you can implement a new
*InputSplit* which wraps multiple *FileInputSplit*s. And you may need to
define in your *InputFormat* on how to process the new *InputSplit*.

Thanks,
Zhu Zhu

Lu Niu  于2019年8月15日周四 上午12:26写道:

> Hi,
>
> I have a data set backed by a directory of files in which file names are
> meaningful.
>
> folder1
>+-file01
>+-file02
>+-file03
>+-file04
>
> I want to control the file assignments in my flink application. For
> example, when parallelism is 2, worker 1 get file01 and file02 to read and
> worker2 get 3 and 4. Also each worker get 2 files all at once because
> reading requires jumping back and forth between those two files.
>
> What's the best way to do this? It seems like FileInputFormat is not
> extensible in this case.
>
> Best
> Lu
>
>
>


Re: How can I pass jvm options to flink when started from command line

2019-08-14 Thread Zhu Zhu
Hi Vishwas,

If what you want is to set JVM options for Flink client JVM when running
jobs with "flink run", I think export the variable 'JVM_ARGS' does help.

Thanks,
Zhu Zhu

Vishwas Siravara  于2019年8月15日周四 上午4:03写道:

> I understand that when I run a flink job from command line it forks a jvm
> and runs the main method and the flink related code run in the task
> manager. So when I say "flink run " the main does not run on JobManager
> hence it does not take env.java.options set in the flink-conf.yaml as this
> applies to the job manager and task manager. Now how can I pass jvm options
> like -Dconfig.resource=qa.conf from command line ?
>
> Thanks,
> Vishwas
>


Re: Why available task slots are not leveraged for pipeline?

2019-08-12 Thread Zhu Zhu
Hi Cam,

Zili is correct.
Each shared slot can at most host one instance of each different
task(JobVertex). So you will have at most 13 tasks in each slot.
As shown in
https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/runtime.html#task-slots-and-resources
.

To specify its parallelism individually, you can invoke setParallelism on
each operator.

Thanks,
Zhu Zhu

Zili Chen  于2019年8月12日周一 下午8:00写道:

> Hi Cam,
>
> If you set parallelism to 60, then you would make use of all 60 slots you
> have and
> for you case, each slot executes a chained operator contains 13 tasks. It
> is not
> the case one slot executes at least 60 sub-tasks.
>
> Best,
> tison.
>
>
> Cam Mach  于2019年8月12日周一 下午7:55写道:
>
>> Hi Zhu and Abhishek,
>>
>> Thanks for your response and pointers. It's correct, the count of
>> parallelism will be the number of slot used for a pipeline. And, the number
>> (or count) of the parallelism is also used to generate number of sub-tasks
>> for each operator. In my case, I have parallelism of 60, it generates 60
>> sub-tasks for each operator. And so it'll be too much for one slot execute
>> at least 60 sub-tasks. I am wondering if there is a way we can set number
>> of generated sub-tasks, different than number of parallelism?
>>
>> Cam Mach
>> Software Engineer
>> E-mail: cammac...@gmail.com
>> Tel: 206 972 2768
>>
>>
>>
>> On Sun, Aug 11, 2019 at 10:37 PM Zhu Zhu  wrote:
>>
>>> Hi Cam,
>>> This case is expected due to slot sharing.
>>> A slot can be shared by one instance of different tasks. So the used
>>> slot is count of your max parallelism of a task.
>>> You can specify the shared group with slotSharingGroup(String
>>> slotSharingGroup) on operators.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Abhishek Jain  于2019年8月12日周一 下午1:23写道:
>>>
>>>> What you'se seeing is likely operator chaining. This is the default
>>>> behaviour of grouping sub tasks to avoid transer overhead (from one slot to
>>>> another). You can disable chaining if you need to. Please refer task
>>>> and operator chains
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html#tasks-and-operator-chains>
>>>> .
>>>>
>>>> - Abhishek
>>>>
>>>> On Mon, 12 Aug 2019 at 09:56, Cam Mach  wrote:
>>>>
>>>>> Hello Flink expert,
>>>>>
>>>>> I have a cluster with 10 Task Managers, configured with 6 task slot
>>>>> each, and a pipeline that has 13 tasks/operators with parallelism of 5. 
>>>>> But
>>>>> when running the pipeline I observer that only  5 slots are being used, 
>>>>> the
>>>>> other 55 slots are available/free. It should use all of my slots, right?
>>>>> since I have 13 (tasks) x 5 = 65 sub-tasks? What are the configuration 
>>>>> that
>>>>> I missed in order to leverage all of the available slots for my pipelines?
>>>>>
>>>>> Thanks,
>>>>> Cam
>>>>>
>>>>>
>>>>


  1   2   >