RE: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-23 Thread LINZ, Arnaud
Hello,

It’s hard to say what caused the timeout to trigger – I agree with you that it 
should not have stopped the heartbeat thread, but it did. The easy fix was to 
increase it until we no longer see our app self-killed. The task was using a 
CPU-intensive computation (with a few threads created at some points… Somehow 
breaking the “slot number” contract).
For the RAM cache, I believe that the hearbeat timeout may also times out 
because of a busy network.

Cheers,
Arnaud


De : Till Rohrmann 
Envoyé : jeudi 22 juillet 2021 11:33
À : LINZ, Arnaud 
Cc : Gen Luo ; Yang Wang ; dev 
; user 
Objet : Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default 
values

Thanks for your inputs Gen and Arnaud.

I do agree with you, Gen, that we need better guidance for our users on when to 
change the heartbeat configuration. I think this should happen in any case. I 
am, however, not so sure whether we can give hard threshold like 5000 tasks, 
for example, because as Arnaud said it strongly depends on the workload. Maybe 
we can explain it based on symptoms a user might experience and what to do then.

Concerning your workloads, Arnaud, I'd be interested to learn a bit more. The 
user code runs in its own thread. This means that its operation won't block the 
main thread/heartbeat. The only thing that can happen is that the user code 
starves the heartbeat in terms of CPU cycles or causes a lot of GC pauses. If 
you are observing the former problem, then we might think about changing the 
priorities of the respective threads. This should then improve Flink's 
stability for these workloads and a shorter heartbeat timeout should be 
possible.

Also for the RAM-cached repositories, what exactly is causing the heartbeat to 
time out? Is it because you have a lot of GC or that the heartbeat thread does 
not get enough CPU cycles?

Cheers,
Till

On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
mailto:al...@bouyguestelecom.fr>> wrote:
Hello,

From a user perspective: we have some (rare) use cases where we use “coarse 
grain” datasets, with big beans and tasks that do lengthy operation (such as ML 
training). In these cases we had to increase the time out to huge values 
(heartbeat.timeout: 50) so that our app is not killed.
I’m aware this is not the way Flink was meant to be used, but it’s a convenient 
way to distribute our workload on datanodes without having to use another 
concurrency framework (such as M/R) that would require the recoding of sources 
and sinks.

In some other (most common) cases, our tasks do some R/W accesses to RAM-cached 
repositories backed by a key-value storage such as Kudu (or Hbase). If most of 
those calls are very fast, sometimes when the system is under heavy load they 
may block more than a few seconds, and having our app killed because of a short 
timeout is not an option.

That’s why I’m not in favor of very short timeouts… Because in my experience it 
really depends on what user code does in the tasks. (I understand that 
normally, as user code is not a JVM-blocking activity such as a GC, it should 
have no impact on heartbeats, but from experience, it really does)

Cheers,
Arnaud


De : Gen Luo mailto:luogen...@gmail.com>>
Envoyé : jeudi 22 juillet 2021 05:46
À : Till Rohrmann mailto:trohrm...@apache.org>>
Cc : Yang Wang mailto:danrtsey...@gmail.com>>; dev 
mailto:d...@flink.apache.org>>; user 
mailto:user@flink.apache.org>>
Objet : Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default 
values

Hi,
Thanks for driving this @Till Rohrmann<mailto:trohrm...@apache.org> . I would 
give +1 on reducing the heartbeat timeout and interval, though I'm not sure if 
15s and 3s would be enough either.

IMO, except for the standalone cluster, where the heartbeat mechanism in Flink 
is totally relied, reducing the heartbeat can also help JM to find out faster 
TaskExecutors in abnormal conditions that can not respond to the heartbeat 
requests, e.g., continuously Full GC, though the process of TaskExecutor is 
alive and may not be known by the deployment system. Since there are cases that 
can benefit from this change, I think it could be done if it won't break the 
experience in other scenarios.

If we can address what will block the main threads from processing heartbeats, 
or enlarge the GC costs, we can try to get rid of them to have a more 
predictable response time of heartbeat, or give some advices to users if their 
jobs may encounter these issues. For example, as far as I know JM of a large 
scale job will be more busy and may not able to process heartbeats in time, 
then we can give a advice that users working with job large than 5000 tasks 
should enlarge there heartbeat interval to 10s and timeout to 50s. The numbers 
are written casually.

As for the issue in FLINK-23216, I think it should be fixed and may not be a 
main concern for this case.

On Wed, Jul 21, 2021 at 6:26 PM Till Rohr

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Gen Luo
Thanks for sharing the thoughts Chesnay, and I overall agree with you. We
can't give a default value suitable for all jobs, but we can figure out
whether the current default value is too large for most of the jobs, and
that is the guideline for this topic. Configurability is reserved for the
others.

But maybe we should list the benefits of the change again. The motivation
presented in the FLIP is mostly about why we can, but may not be enough
about why we should, considering that FLINK-23216 will be fixed.

By the way, I'd like to make sure if the behavior of jobs using default
values will be changed when they upgrade their Flink version and resume
from savepoints. If so, we have to give a big warning  to users if we
finally decide to change this, since the change is silent but can be
critical in some cases.

@Till
> I am, however, not so sure whether we can give hard threshold
like 5000 tasks
A hard threshold can't apply to all cases indeed. I meant to suggest a
configuration validation phase, where we can check if the configuration is
potentially not suitable for the job, or if some options are not able to be
used together with other options. Users will be warned and if anything
wrong happens, this will be one of the debug guides. This is a little like
Chesnay's suggest, but in compiling phase. Large scale jobs with too short
heartbeat intervals is one of the cases, but I agree it's hard to decide
when we should show the warning.

On Thu, Jul 22, 2021 at 11:09 PM Chesnay Schepler 
wrote:

> I'm wondering if this discussion isn't going in the wrong direction.
> It is clear that we cannot support all use-case with the defaults, so
> let's not try that. We won't find it.
> And I would argue that is also not their purpose; they are configurable
> for a reason.
> I would say the defaults should provide a good experience to users
> starting out with Flink.
>
> Because, users with heavy workloads don't deploy Flink over night. They
> approach a production deployment step-by-step, for the very purpose of
> working out kinks in the configuration/stability.
> If in this process the default heartbeat configurations ends up being too
> harsh, *then that is not a problem*.
> *If *there is sufficient information presented to the user on how to
> transition to a working setup, that is.
> So far the purpose of that was the production checklist to some extent,
> but maybe we should also add a separate page for working under bigger loads.
>
> A far greater issue in my opinion is that users don't get warnings that
> something is about to go wrong.
> The heartbeat system right now is fairly binary. It works fine while the
> configuration is suitable, until it no longer is and everything goes up in
> flames.
>
> If we were to log warnings if Flink was close to hitting the heartbeat
> timeout, or even expose metrics for heartbeat round-trip times or similar,
> I think we could alleviate many concerns that people have.
>
> Or we just provide 2 configs with the distribution, one for starting out,
> one for more serious workloads. ¯\_(ツ)_/¯
> On 22/07/2021 13:22, 刘建刚 wrote:
>
> Thanks, Till. There are many reasons to reduce the heartbeat interval and
> timeout. But I am not sure what values are suitable. In our cases, the GC
> time and big job can be related factors. Since most flink jobs are pipeline
> and a total failover can cost some time, we should tolerate some stop-world
> situations. Also, I think that the FLINK-23216 should be solved to detect
> lost container fast and react to it. For my side, I suggest
> reducing the values gradually.
>
> Till Rohrmann  于2021年7月22日周四 下午5:33写道:
>
>> Thanks for your inputs Gen and Arnaud.
>>
>> I do agree with you, Gen, that we need better guidance for our users on
>> when to change the heartbeat configuration. I think this should happen in
>> any case. I am, however, not so sure whether we can give hard threshold
>> like 5000 tasks, for example, because as Arnaud said it strongly depends
>> on
>> the workload. Maybe we can explain it based on symptoms a user might
>> experience and what to do then.
>>
>> Concerning your workloads, Arnaud, I'd be interested to learn a bit more.
>> The user code runs in its own thread. This means that its operation won't
>> block the main thread/heartbeat. The only thing that can happen is that
>> the
>> user code starves the heartbeat in terms of CPU cycles or causes a lot of
>> GC pauses. If you are observing the former problem, then we might think
>> about changing the priorities of the respective threads. This should then
>> improve Flink's stability for these workloads and a shorter heartbeat
>> timeout should be possible.
>>
>>

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Chesnay Schepler

I'm wondering if this discussion isn't going in the wrong direction.
It is clear that we cannot support all use-case with the defaults, so 
let's not try that. We won't find it.
And I would argue that is also not their purpose; they are configurable 
for a reason.
I would say the defaults should provide a good experience to users 
starting out with Flink.


Because, users with heavy workloads don't deploy Flink over night. They 
approach a production deployment step-by-step, for the very purpose of 
working out kinks in the configuration/stability.
If in this process the default heartbeat configurations ends up being 
too harsh, /then that is not a problem/.
*If *there is sufficient information presented to the user on how to 
transition to a working setup, that is.
So far the purpose of that was the production checklist to some extent, 
but maybe we should also add a separate page for working under bigger loads.


A far greater issue in my opinion is that users don't get warnings that 
something is about to go wrong.
The heartbeat system right now is fairly binary. It works fine while the 
configuration is suitable, until it no longer is and everything goes up 
in flames.


If we were to log warnings if Flink was close to hitting the heartbeat 
timeout, or even expose metrics for heartbeat round-trip times or 
similar, I think we could alleviate many concerns that people have.


Or we just provide 2 configs with the distribution, one for starting 
out, one for more serious workloads. ¯\_(ツ)_/¯


On 22/07/2021 13:22, 刘建刚 wrote:
Thanks, Till. There are many reasons to reduce the heartbeat interval 
and timeout. But I am not sure what values are suitable. In our cases, 
the GC time and big job can be related factors. Since most flink jobs 
are pipeline and a total failover can cost some time, we should 
tolerate some stop-world situations. Also, I think that 
the FLINK-23216 should be solved to detect lost container fast and 
react to it. For my side, I suggest reducing the values gradually.


Till Rohrmann mailto:trohrm...@apache.org>> 
于2021年7月22日周四 下午5:33写道:


Thanks for your inputs Gen and Arnaud.

I do agree with you, Gen, that we need better guidance for our
users on
when to change the heartbeat configuration. I think this should
happen in
any case. I am, however, not so sure whether we can give hard
threshold
like 5000 tasks, for example, because as Arnaud said it strongly
depends on
the workload. Maybe we can explain it based on symptoms a user might
experience and what to do then.

Concerning your workloads, Arnaud, I'd be interested to learn a
bit more.
The user code runs in its own thread. This means that its
operation won't
block the main thread/heartbeat. The only thing that can happen is
that the
user code starves the heartbeat in terms of CPU cycles or causes a
lot of
GC pauses. If you are observing the former problem, then we might
think
about changing the priorities of the respective threads. This
should then
improve Flink's stability for these workloads and a shorter heartbeat
timeout should be possible.

Also for the RAM-cached repositories, what exactly is causing the
heartbeat
to time out? Is it because you have a lot of GC or that the heartbeat
thread does not get enough CPU cycles?

Cheers,
Till

On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud
mailto:al...@bouyguestelecom.fr>>
wrote:

> Hello,
>
>
>
> From a user perspective: we have some (rare) use cases where we use
> “coarse grain” datasets, with big beans and tasks that do
lengthy operation
> (such as ML training). In these cases we had to increase the
time out to
> huge values (heartbeat.timeout: 50) so that our app is not
killed.
>
> I’m aware this is not the way Flink was meant to be used, but it’s a
> convenient way to distribute our workload on datanodes without
having to
> use another concurrency framework (such as M/R) that would
require the
> recoding of sources and sinks.
>
>
>
> In some other (most common) cases, our tasks do some R/W accesses to
> RAM-cached repositories backed by a key-value storage such as
Kudu (or
> Hbase). If most of those calls are very fast, sometimes when the
system is
> under heavy load they may block more than a few seconds, and
having our app
> killed because of a short timeout is not an option.
>
>
>
> That’s why I’m not in favor of very short timeouts… Because in my
> experience it really depends on what user code does in the tasks. (I
> understand that normally, as user code is not a JVM-blocking
activity such
> as a GC, it should have no impact on heartbeats, but fro

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread 刘建刚
Thanks, Till. There are many reasons to reduce the heartbeat interval and
timeout. But I am not sure what values are suitable. In our cases, the GC
time and big job can be related factors. Since most flink jobs are pipeline
and a total failover can cost some time, we should tolerate some stop-world
situations. Also, I think that the FLINK-23216 should be solved to detect
lost container fast and react to it. For my side, I suggest
reducing the values gradually.

Till Rohrmann  于2021年7月22日周四 下午5:33写道:

> Thanks for your inputs Gen and Arnaud.
>
> I do agree with you, Gen, that we need better guidance for our users on
> when to change the heartbeat configuration. I think this should happen in
> any case. I am, however, not so sure whether we can give hard threshold
> like 5000 tasks, for example, because as Arnaud said it strongly depends on
> the workload. Maybe we can explain it based on symptoms a user might
> experience and what to do then.
>
> Concerning your workloads, Arnaud, I'd be interested to learn a bit more.
> The user code runs in its own thread. This means that its operation won't
> block the main thread/heartbeat. The only thing that can happen is that the
> user code starves the heartbeat in terms of CPU cycles or causes a lot of
> GC pauses. If you are observing the former problem, then we might think
> about changing the priorities of the respective threads. This should then
> improve Flink's stability for these workloads and a shorter heartbeat
> timeout should be possible.
>
> Also for the RAM-cached repositories, what exactly is causing the heartbeat
> to time out? Is it because you have a lot of GC or that the heartbeat
> thread does not get enough CPU cycles?
>
> Cheers,
> Till
>
> On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
> wrote:
>
> > Hello,
> >
> >
> >
> > From a user perspective: we have some (rare) use cases where we use
> > “coarse grain” datasets, with big beans and tasks that do lengthy
> operation
> > (such as ML training). In these cases we had to increase the time out to
> > huge values (heartbeat.timeout: 50) so that our app is not killed.
> >
> > I’m aware this is not the way Flink was meant to be used, but it’s a
> > convenient way to distribute our workload on datanodes without having to
> > use another concurrency framework (such as M/R) that would require the
> > recoding of sources and sinks.
> >
> >
> >
> > In some other (most common) cases, our tasks do some R/W accesses to
> > RAM-cached repositories backed by a key-value storage such as Kudu (or
> > Hbase). If most of those calls are very fast, sometimes when the system
> is
> > under heavy load they may block more than a few seconds, and having our
> app
> > killed because of a short timeout is not an option.
> >
> >
> >
> > That’s why I’m not in favor of very short timeouts… Because in my
> > experience it really depends on what user code does in the tasks. (I
> > understand that normally, as user code is not a JVM-blocking activity
> such
> > as a GC, it should have no impact on heartbeats, but from experience, it
> > really does)
> >
> >
> >
> > Cheers,
> >
> > Arnaud
> >
> >
> >
> >
> >
> > *De :* Gen Luo 
> > *Envoyé :* jeudi 22 juillet 2021 05:46
> > *À :* Till Rohrmann 
> > *Cc :* Yang Wang ; dev ;
> > user 
> > *Objet :* Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval
> > default values
> >
> >
> >
> > Hi,
> >
> > Thanks for driving this @Till Rohrmann  . I would
> > give +1 on reducing the heartbeat timeout and interval, though I'm not
> sure
> > if 15s and 3s would be enough either.
> >
> >
> >
> > IMO, except for the standalone cluster, where the heartbeat mechanism in
> > Flink is totally relied, reducing the heartbeat can also help JM to find
> > out faster TaskExecutors in abnormal conditions that can not respond to
> the
> > heartbeat requests, e.g., continuously Full GC, though the process of
> > TaskExecutor is alive and may not be known by the deployment system.
> Since
> > there are cases that can benefit from this change, I think it could be
> done
> > if it won't break the experience in other scenarios.
> >
> >
> >
> > If we can address what will block the main threads from processing
> > heartbeats, or enlarge the GC costs, we can try to get rid of them to
> have
> > a more predictable response time of heartbeat, or give some advices to
> > users if their jobs may encounter these issues. For example, as far 

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Till Rohrmann
Thanks for your inputs Gen and Arnaud.

I do agree with you, Gen, that we need better guidance for our users on
when to change the heartbeat configuration. I think this should happen in
any case. I am, however, not so sure whether we can give hard threshold
like 5000 tasks, for example, because as Arnaud said it strongly depends on
the workload. Maybe we can explain it based on symptoms a user might
experience and what to do then.

Concerning your workloads, Arnaud, I'd be interested to learn a bit more.
The user code runs in its own thread. This means that its operation won't
block the main thread/heartbeat. The only thing that can happen is that the
user code starves the heartbeat in terms of CPU cycles or causes a lot of
GC pauses. If you are observing the former problem, then we might think
about changing the priorities of the respective threads. This should then
improve Flink's stability for these workloads and a shorter heartbeat
timeout should be possible.

Also for the RAM-cached repositories, what exactly is causing the heartbeat
to time out? Is it because you have a lot of GC or that the heartbeat
thread does not get enough CPU cycles?

Cheers,
Till

On Thu, Jul 22, 2021 at 9:16 AM LINZ, Arnaud 
wrote:

> Hello,
>
>
>
> From a user perspective: we have some (rare) use cases where we use
> “coarse grain” datasets, with big beans and tasks that do lengthy operation
> (such as ML training). In these cases we had to increase the time out to
> huge values (heartbeat.timeout: 50) so that our app is not killed.
>
> I’m aware this is not the way Flink was meant to be used, but it’s a
> convenient way to distribute our workload on datanodes without having to
> use another concurrency framework (such as M/R) that would require the
> recoding of sources and sinks.
>
>
>
> In some other (most common) cases, our tasks do some R/W accesses to
> RAM-cached repositories backed by a key-value storage such as Kudu (or
> Hbase). If most of those calls are very fast, sometimes when the system is
> under heavy load they may block more than a few seconds, and having our app
> killed because of a short timeout is not an option.
>
>
>
> That’s why I’m not in favor of very short timeouts… Because in my
> experience it really depends on what user code does in the tasks. (I
> understand that normally, as user code is not a JVM-blocking activity such
> as a GC, it should have no impact on heartbeats, but from experience, it
> really does)
>
>
>
> Cheers,
>
> Arnaud
>
>
>
>
>
> *De :* Gen Luo 
> *Envoyé :* jeudi 22 juillet 2021 05:46
> *À :* Till Rohrmann 
> *Cc :* Yang Wang ; dev ;
> user 
> *Objet :* Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval
> default values
>
>
>
> Hi,
>
> Thanks for driving this @Till Rohrmann  . I would
> give +1 on reducing the heartbeat timeout and interval, though I'm not sure
> if 15s and 3s would be enough either.
>
>
>
> IMO, except for the standalone cluster, where the heartbeat mechanism in
> Flink is totally relied, reducing the heartbeat can also help JM to find
> out faster TaskExecutors in abnormal conditions that can not respond to the
> heartbeat requests, e.g., continuously Full GC, though the process of
> TaskExecutor is alive and may not be known by the deployment system. Since
> there are cases that can benefit from this change, I think it could be done
> if it won't break the experience in other scenarios.
>
>
>
> If we can address what will block the main threads from processing
> heartbeats, or enlarge the GC costs, we can try to get rid of them to have
> a more predictable response time of heartbeat, or give some advices to
> users if their jobs may encounter these issues. For example, as far as I
> know JM of a large scale job will be more busy and may not able to process
> heartbeats in time, then we can give a advice that users working with job
> large than 5000 tasks should enlarge there heartbeat interval to 10s and
> timeout to 50s. The numbers are written casually.
>
>
>
> As for the issue in FLINK-23216, I think it should be fixed and may not be
> a main concern for this case.
>
>
>
> On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann 
> wrote:
>
> Thanks for sharing these insights.
>
>
>
> I think it is no longer true that the ResourceManager notifies the
> JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.
>
>
>
> Given the GC pauses, would you then be ok with decreasing the heartbeat
> timeout to 20 seconds? This should give enough time to do the GC and then
> still send/receive a heartbeat request.
>
>
>
> I also wanted to add that we are about to get rid of one big cause of
> blocking I/O operat

RE: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread LINZ, Arnaud
Hello,

From a user perspective: we have some (rare) use cases where we use “coarse 
grain” datasets, with big beans and tasks that do lengthy operation (such as ML 
training). In these cases we had to increase the time out to huge values 
(heartbeat.timeout: 50) so that our app is not killed.
I’m aware this is not the way Flink was meant to be used, but it’s a convenient 
way to distribute our workload on datanodes without having to use another 
concurrency framework (such as M/R) that would require the recoding of sources 
and sinks.

In some other (most common) cases, our tasks do some R/W accesses to RAM-cached 
repositories backed by a key-value storage such as Kudu (or Hbase). If most of 
those calls are very fast, sometimes when the system is under heavy load they 
may block more than a few seconds, and having our app killed because of a short 
timeout is not an option.

That’s why I’m not in favor of very short timeouts… Because in my experience it 
really depends on what user code does in the tasks. (I understand that 
normally, as user code is not a JVM-blocking activity such as a GC, it should 
have no impact on heartbeats, but from experience, it really does)

Cheers,
Arnaud


De : Gen Luo 
Envoyé : jeudi 22 juillet 2021 05:46
À : Till Rohrmann 
Cc : Yang Wang ; dev ; user 

Objet : Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default 
values

Hi,
Thanks for driving this @Till Rohrmann<mailto:trohrm...@apache.org> . I would 
give +1 on reducing the heartbeat timeout and interval, though I'm not sure if 
15s and 3s would be enough either.

IMO, except for the standalone cluster, where the heartbeat mechanism in Flink 
is totally relied, reducing the heartbeat can also help JM to find out faster 
TaskExecutors in abnormal conditions that can not respond to the heartbeat 
requests, e.g., continuously Full GC, though the process of TaskExecutor is 
alive and may not be known by the deployment system. Since there are cases that 
can benefit from this change, I think it could be done if it won't break the 
experience in other scenarios.

If we can address what will block the main threads from processing heartbeats, 
or enlarge the GC costs, we can try to get rid of them to have a more 
predictable response time of heartbeat, or give some advices to users if their 
jobs may encounter these issues. For example, as far as I know JM of a large 
scale job will be more busy and may not able to process heartbeats in time, 
then we can give a advice that users working with job large than 5000 tasks 
should enlarge there heartbeat interval to 10s and timeout to 50s. The numbers 
are written casually.

As for the issue in FLINK-23216, I think it should be fixed and may not be a 
main concern for this case.

On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Thanks for sharing these insights.

I think it is no longer true that the ResourceManager notifies the JobMaster 
about lost TaskExecutors. See FLINK-23216 [1] for more details.

Given the GC pauses, would you then be ok with decreasing the heartbeat timeout 
to 20 seconds? This should give enough time to do the GC and then still 
send/receive a heartbeat request.

I also wanted to add that we are about to get rid of one big cause of blocking 
I/O operations from the main thread. With FLINK-22483 [2] we will get rid of 
Filesystem accesses to retrieve completed checkpoints. This leaves us with one 
additional file system access from the main thread which is the one completing 
a pending checkpoint. I think it should be possible to get rid of this access 
because as Stephan said it only writes information to disk that is already 
written before. Maybe solving these two issues could ease concerns about long 
pauses of unresponsiveness of Flink.

[1] https://issues.apache.org/jira/browse/FLINK-23216
[2] https://issues.apache.org/jira/browse/FLINK-22483

Cheers,
Till

On Wed, Jul 21, 2021 at 4:58 AM Yang Wang 
mailto:danrtsey...@gmail.com>> wrote:
Thanks @Till Rohrmann<mailto:trohrm...@apache.org>  for starting this discussion

Firstly, I try to understand the benefit of shorter heartbeat timeout. IIUC, it 
will make the JobManager aware of
TaskManager faster. However, it seems that only the standalone cluster could 
benefit from this. For Yarn and
native Kubernetes deployment, the Flink ResourceManager should get the 
TaskManager lost event in a very short time.

* About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
* Less than 1 second, Flink RM has a watch for all the TaskManager pods

Secondly, I am not very confident to decrease the timeout to 15s. I have 
quickly checked the TaskManager GC logs
in the past week of our internal Flink workloads and find more than 100 
10-seconds Full GC logs, but no one is bigger than 15s.
We are using CMS GC for old generation.


Best,
Yang

Till Rohrmann mailto:trohrm...@apache.org>> 于2021年7月17日周六 
上

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Gen Luo
Hi,
Thanks for driving this @Till Rohrmann  . I would
give +1 on reducing the heartbeat timeout and interval, though I'm not sure
if 15s and 3s would be enough either.

IMO, except for the standalone cluster, where the heartbeat mechanism in
Flink is totally relied, reducing the heartbeat can also help JM to find
out faster TaskExecutors in abnormal conditions that can not respond to the
heartbeat requests, e.g., continuously Full GC, though the process of
TaskExecutor is alive and may not be known by the deployment system. Since
there are cases that can benefit from this change, I think it could be done
if it won't break the experience in other scenarios.

If we can address what will block the main threads from processing
heartbeats, or enlarge the GC costs, we can try to get rid of them to have
a more predictable response time of heartbeat, or give some advices to
users if their jobs may encounter these issues. For example, as far as I
know JM of a large scale job will be more busy and may not able to process
heartbeats in time, then we can give a advice that users working with job
large than 5000 tasks should enlarge there heartbeat interval to 10s and
timeout to 50s. The numbers are written casually.

As for the issue in FLINK-23216, I think it should be fixed and may not be
a main concern for this case.

On Wed, Jul 21, 2021 at 6:26 PM Till Rohrmann  wrote:

> Thanks for sharing these insights.
>
> I think it is no longer true that the ResourceManager notifies the
> JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.
>
> Given the GC pauses, would you then be ok with decreasing the heartbeat
> timeout to 20 seconds? This should give enough time to do the GC and then
> still send/receive a heartbeat request.
>
> I also wanted to add that we are about to get rid of one big cause of
> blocking I/O operations from the main thread. With FLINK-22483 [2] we will
> get rid of Filesystem accesses to retrieve completed checkpoints. This
> leaves us with one additional file system access from the main thread which
> is the one completing a pending checkpoint. I think it should be possible
> to get rid of this access because as Stephan said it only writes
> information to disk that is already written before. Maybe solving these two
> issues could ease concerns about long pauses of unresponsiveness of Flink.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
> [2] https://issues.apache.org/jira/browse/FLINK-22483
>
> Cheers,
> Till
>
> On Wed, Jul 21, 2021 at 4:58 AM Yang Wang  wrote:
>
>> Thanks @Till Rohrmann   for starting this
>> discussion
>>
>> Firstly, I try to understand the benefit of shorter heartbeat timeout.
>> IIUC, it will make the JobManager aware of
>> TaskManager faster. However, it seems that only the standalone cluster
>> could benefit from this. For Yarn and
>> native Kubernetes deployment, the Flink ResourceManager should get the
>> TaskManager lost event in a very short time.
>>
>> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
>> * Less than 1 second, Flink RM has a watch for all the TaskManager pods
>>
>> Secondly, I am not very confident to decrease the timeout to 15s. I have
>> quickly checked the TaskManager GC logs
>> in the past week of our internal Flink workloads and find more than 100
>> 10-seconds Full GC logs, but no one is bigger than 15s.
>> We are using CMS GC for old generation.
>>
>>
>> Best,
>> Yang
>>
>> Till Rohrmann  于2021年7月17日周六 上午1:05写道:
>>
>>> Hi everyone,
>>>
>>> Since Flink 1.5 we have the same heartbeat timeout and interval default
>>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
>>> 10s. These values were mainly chosen to compensate for lengthy GC pauses
>>> and blocking operations that were executed in the main threads of Flink's
>>> components. Since then, there were quite some advancements wrt the JVM's
>>> GCs and we also got rid of a lot of blocking calls that were executed in
>>> the main thread. Moreover, a long heartbeat.timeout causes long recovery
>>> times in case of a TaskManager loss because the system can only properly
>>> recover after the dead TaskManager has been removed from the scheduler.
>>> Hence, I wanted to propose to change the timeout and interval to:
>>>
>>> heartbeat.timeout: 15s
>>> heartbeat.interval: 3s
>>>
>>> Since there is no perfect solution that fits all use cases, I would
>>> really
>>> like to hear from you what you think about it and how you configure these
>>> heartbeat options. Based on your experience we might actually come up
>>> with
>>> better default values that allow us to be resilient but also to detect
>>> failed components fast. FLIP-185 can be found here [1].
>>>
>>> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>>>
>>> Cheers,
>>> Till
>>>
>>


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Till Rohrmann
Thanks for sharing these insights.

I think it is no longer true that the ResourceManager notifies the
JobMaster about lost TaskExecutors. See FLINK-23216 [1] for more details.

Given the GC pauses, would you then be ok with decreasing the heartbeat
timeout to 20 seconds? This should give enough time to do the GC and then
still send/receive a heartbeat request.

I also wanted to add that we are about to get rid of one big cause of
blocking I/O operations from the main thread. With FLINK-22483 [2] we will
get rid of Filesystem accesses to retrieve completed checkpoints. This
leaves us with one additional file system access from the main thread which
is the one completing a pending checkpoint. I think it should be possible
to get rid of this access because as Stephan said it only writes
information to disk that is already written before. Maybe solving these two
issues could ease concerns about long pauses of unresponsiveness of Flink.

[1] https://issues.apache.org/jira/browse/FLINK-23216
[2] https://issues.apache.org/jira/browse/FLINK-22483

Cheers,
Till

On Wed, Jul 21, 2021 at 4:58 AM Yang Wang  wrote:

> Thanks @Till Rohrmann   for starting this discussion
>
> Firstly, I try to understand the benefit of shorter heartbeat timeout.
> IIUC, it will make the JobManager aware of
> TaskManager faster. However, it seems that only the standalone cluster
> could benefit from this. For Yarn and
> native Kubernetes deployment, the Flink ResourceManager should get the
> TaskManager lost event in a very short time.
>
> * About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
> * Less than 1 second, Flink RM has a watch for all the TaskManager pods
>
> Secondly, I am not very confident to decrease the timeout to 15s. I have
> quickly checked the TaskManager GC logs
> in the past week of our internal Flink workloads and find more than 100
> 10-seconds Full GC logs, but no one is bigger than 15s.
> We are using CMS GC for old generation.
>
>
> Best,
> Yang
>
> Till Rohrmann  于2021年7月17日周六 上午1:05写道:
>
>> Hi everyone,
>>
>> Since Flink 1.5 we have the same heartbeat timeout and interval default
>> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
>> 10s. These values were mainly chosen to compensate for lengthy GC pauses
>> and blocking operations that were executed in the main threads of Flink's
>> components. Since then, there were quite some advancements wrt the JVM's
>> GCs and we also got rid of a lot of blocking calls that were executed in
>> the main thread. Moreover, a long heartbeat.timeout causes long recovery
>> times in case of a TaskManager loss because the system can only properly
>> recover after the dead TaskManager has been removed from the scheduler.
>> Hence, I wanted to propose to change the timeout and interval to:
>>
>> heartbeat.timeout: 15s
>> heartbeat.interval: 3s
>>
>> Since there is no perfect solution that fits all use cases, I would really
>> like to hear from you what you think about it and how you configure these
>> heartbeat options. Based on your experience we might actually come up with
>> better default values that allow us to be resilient but also to detect
>> failed components fast. FLIP-185 can be found here [1].
>>
>> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>>
>> Cheers,
>> Till
>>
>


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-20 Thread Yang Wang
Thanks @Till Rohrmann   for starting this discussion

Firstly, I try to understand the benefit of shorter heartbeat timeout.
IIUC, it will make the JobManager aware of
TaskManager faster. However, it seems that only the standalone cluster
could benefit from this. For Yarn and
native Kubernetes deployment, the Flink ResourceManager should get the
TaskManager lost event in a very short time.

* About 8 seconds, 3s for Yarn NM -> Yarn RM, 5s for Yarn RM -> Flink RM
* Less than 1 second, Flink RM has a watch for all the TaskManager pods

Secondly, I am not very confident to decrease the timeout to 15s. I have
quickly checked the TaskManager GC logs
in the past week of our internal Flink workloads and find more than 100
10-seconds Full GC logs, but no one is bigger than 15s.
We are using CMS GC for old generation.


Best,
Yang

Till Rohrmann  于2021年7月17日周六 上午1:05写道:

> Hi everyone,
>
> Since Flink 1.5 we have the same heartbeat timeout and interval default
> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
> 10s. These values were mainly chosen to compensate for lengthy GC pauses
> and blocking operations that were executed in the main threads of Flink's
> components. Since then, there were quite some advancements wrt the JVM's
> GCs and we also got rid of a lot of blocking calls that were executed in
> the main thread. Moreover, a long heartbeat.timeout causes long recovery
> times in case of a TaskManager loss because the system can only properly
> recover after the dead TaskManager has been removed from the scheduler.
> Hence, I wanted to propose to change the timeout and interval to:
>
> heartbeat.timeout: 15s
> heartbeat.interval: 3s
>
> Since there is no perfect solution that fits all use cases, I would really
> like to hear from you what you think about it and how you configure these
> heartbeat options. Based on your experience we might actually come up with
> better default values that allow us to be resilient but also to detect
> failed components fast. FLIP-185 can be found here [1].
>
> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>
> Cheers,
> Till
>


Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-20 Thread Robert Metzger
+1 to this change!

When I was working on the reactive mode blog post [1] I also ran into this
issue, leading to a poor "out of the box" experience when scaling down.
For my experiments, I've chosen a timeout of 8 seconds, and the cluster has
been running for 76 days (so far) on Kubernetes.
I also consider this change somewhat low-risk, because we can provide a
quick fix for people running into problems.

[1]https://flink.apache.org/2021/05/06/reactive-mode.html


On Fri, Jul 16, 2021 at 7:05 PM Till Rohrmann  wrote:

> Hi everyone,
>
> Since Flink 1.5 we have the same heartbeat timeout and interval default
> values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
> 10s. These values were mainly chosen to compensate for lengthy GC pauses
> and blocking operations that were executed in the main threads of Flink's
> components. Since then, there were quite some advancements wrt the JVM's
> GCs and we also got rid of a lot of blocking calls that were executed in
> the main thread. Moreover, a long heartbeat.timeout causes long recovery
> times in case of a TaskManager loss because the system can only properly
> recover after the dead TaskManager has been removed from the scheduler.
> Hence, I wanted to propose to change the timeout and interval to:
>
> heartbeat.timeout: 15s
> heartbeat.interval: 3s
>
> Since there is no perfect solution that fits all use cases, I would really
> like to hear from you what you think about it and how you configure these
> heartbeat options. Based on your experience we might actually come up with
> better default values that allow us to be resilient but also to detect
> failed components fast. FLIP-185 can be found here [1].
>
> [1] https://cwiki.apache.org/confluence/x/GAoBCw
>
> Cheers,
> Till
>


[DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-16 Thread Till Rohrmann
Hi everyone,

Since Flink 1.5 we have the same heartbeat timeout and interval default
values that are defined as heartbeat.timeout: 50s and heartbeat.interval:
10s. These values were mainly chosen to compensate for lengthy GC pauses
and blocking operations that were executed in the main threads of Flink's
components. Since then, there were quite some advancements wrt the JVM's
GCs and we also got rid of a lot of blocking calls that were executed in
the main thread. Moreover, a long heartbeat.timeout causes long recovery
times in case of a TaskManager loss because the system can only properly
recover after the dead TaskManager has been removed from the scheduler.
Hence, I wanted to propose to change the timeout and interval to:

heartbeat.timeout: 15s
heartbeat.interval: 3s

Since there is no perfect solution that fits all use cases, I would really
like to hear from you what you think about it and how you configure these
heartbeat options. Based on your experience we might actually come up with
better default values that allow us to be resilient but also to detect
failed components fast. FLIP-185 can be found here [1].

[1] https://cwiki.apache.org/confluence/x/GAoBCw

Cheers,
Till


Re: Heartbeat Timeout

2021-05-28 Thread Robert Cullen
Matthias,  I increased the JVM Heap size as Jan suggested and it appears to
be a memory leak in the user code (although I'm not sure why since this is
a simple job that uses a loop to simulate data being written to an S3 data
store).  Yes, the logs show no apparent problem but the timestamp
corresponds to the job failure.  Forgive me but I don't know how to analyze
a heap dump.

On Fri, May 28, 2021 at 8:27 AM Matthias Pohl 
wrote:

> Hi Robert,
> increasing heap memory usage could be due to some memory leak in the user
> code. Have you analyzed a heap dump? About the TM logs you shared. I don't
> see anything suspicious there. Nothing about memory problems. Are those the
> correct logs?
>
> Best,
> Matthias
>
> On Thu, May 27, 2021 at 6:01 PM Jan Brusch 
> wrote:
>
>> Hi Robert,
>>
>> that sounds like a case of either your application state ultimately being
>> bigger than the available RAM or a memory leak in your application (e.g.,
>> some states are not properly cleaned out after they are not needed anymore).
>>
>> If you have the available resources you could try and increase the
>> TaskManager RAM size by a large amount and see where RAM usage tops out. If
>> it ever does... in case of a memory leak it would grow indefinitely. Then
>> you could reason about how to fix the memory leak or how to achieve your
>> goal with a smaller application state.
>>
>> A remedy for application states larger than your available RAM is to use
>> the RocksDB State backend, which allows for states larger than your
>> application RAM. But that requires your kubernetes nodes to be equipped
>> with a fast hard drive (SSD, optimally).
>>
>> That's how I would approach your problem...
>>
>>
>> Hope that helps
>>
>> Jan
>> On 27.05.21 17:51, Robert Cullen wrote:
>>
>> Hello Jan,
>>
>> My flink cluster is running on a kubernetes single node (rke). I have the
>> JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The
>> TaskManger reaches the max JVM Heap Size after about one hour then fails.
>> Here is a snippet from the TaskManager log:
>>
>> 2021-05-27 15:36:36,040 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
>> JobManager address, beginning registration
>> 2021-05-27 15:36:36,041 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
>> Successful registration at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Establish 
>> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Offer 
>> reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
>> TaskSlot(index:2, state:ALLOCATED, resource profile: 
>> ResourceProfile{cpuCores=1., taskHeapMemory=500.000mb 
>> (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb 
>> (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 
>> 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove 
>> job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
>> 2021-05-27 15:36:36,042 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
>> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Receive 
>> slot request 85433366f8bf1c5efd3b88f634676764 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 
>> .
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Allocated 
>> slot for 85433366f8bf1c5efd3b88f634676764.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
>> c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
>> 2021-05-27 15:36:36,043 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
>> register at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 
>> ----.
>> 2021-05-27 15:36:36,044 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
>> JobManager address, beginning registration
>> 2021-05-27 15:36:36,045 INFO  
>> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
>> Successful registration at job manager 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
>> c5ff9686e944f62a24c10c6bf20a5a55.
>>
>> I guess the 

Re: Heartbeat Timeout

2021-05-28 Thread Matthias Pohl
Hi Robert,
increasing heap memory usage could be due to some memory leak in the user
code. Have you analyzed a heap dump? About the TM logs you shared. I don't
see anything suspicious there. Nothing about memory problems. Are those the
correct logs?

Best,
Matthias

On Thu, May 27, 2021 at 6:01 PM Jan Brusch 
wrote:

> Hi Robert,
>
> that sounds like a case of either your application state ultimately being
> bigger than the available RAM or a memory leak in your application (e.g.,
> some states are not properly cleaned out after they are not needed anymore).
>
> If you have the available resources you could try and increase the
> TaskManager RAM size by a large amount and see where RAM usage tops out. If
> it ever does... in case of a memory leak it would grow indefinitely. Then
> you could reason about how to fix the memory leak or how to achieve your
> goal with a smaller application state.
>
> A remedy for application states larger than your available RAM is to use
> the RocksDB State backend, which allows for states larger than your
> application RAM. But that requires your kubernetes nodes to be equipped
> with a fast hard drive (SSD, optimally).
>
> That's how I would approach your problem...
>
>
> Hope that helps
>
> Jan
> On 27.05.21 17:51, Robert Cullen wrote:
>
> Hello Jan,
>
> My flink cluster is running on a kubernetes single node (rke). I have the
> JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The
> TaskManger reaches the max JVM Heap Size after about one hour then fails.
> Here is a snippet from the TaskManager log:
>
> 2021-05-27 15:36:36,040 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
> JobManager address, beginning registration
> 2021-05-27 15:36:36,041 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
> registration at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
> c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Establish 
> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Offer 
> reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
> TaskSlot(index:2, state:ALLOCATED, resource profile: 
> ResourceProfile{cpuCores=1., taskHeapMemory=500.000mb 
> (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb 
> (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 
> 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
> c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close 
> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Receive 
> slot request 85433366f8bf1c5efd3b88f634676764 for job 
> c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 
> .
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Allocated 
> slot for 85433366f8bf1c5efd3b88f634676764.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
> c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
> register at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 
> ----.
> 2021-05-27 15:36:36,044 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
> JobManager address, beginning registration
> 2021-05-27 15:36:36,045 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
> registration at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
> c5ff9686e944f62a24c10c6bf20a5a55.
>
> I guess the simple resolution is to increase the JVM Heap Size?
>
> On Thu, May 27, 2021 at 10:51 AM Jan Brusch 
> wrote:
>
>> Hi Robert,
>>
>> do you have some additional info? For example the last log message of the
>> unreachable TaskManagers. Is the Job running in kubernetes? What backend
>> are you using?
>>
>> From the first looks of it, I have seen this behaviour mostly in cases
>> where one or more taskmanagers shut down due to GarbageCollection issues or
>> OutOfMemory-Errors.
>>
>>
>> Best regards
>>
>> Jan
>> On 27.05.21 16:44, Robert Cul

Re: Heartbeat Timeout

2021-05-27 Thread wangwj
hi, I have encountered the same problem.
Check gc log and jstack, you will resolve this problem.
good luck



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Heartbeat Timeout

2021-05-27 Thread JING ZHANG
Hi Robert Cullen,
1. You may solve the problem by increasing the timeout by define
`heartbeat.timeout` [1], however I do not recommend this way because it
will hide the real problem.
2. Please find out the gc log and log of timeout-taskmanager(10.42.0.49:6122
-e26293), check 1. is there memory leak? 2. is there full gc near the
moment `2021-05-27 10:24:21`. Maybe you could share the log here.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#advanced-fault-tolerance-options

Best,
JING ZHANG


Robert Cullen  于2021年5月27日周四 下午10:44写道:

> I have a job that fails after @1 hour due to a TaskManager Timeout. How
> can I prevent this from happening?
>
> 2021-05-27 10:24:21
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
> at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
> at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
> at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
> at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.a

Re: Heartbeat Timeout

2021-05-27 Thread Yangze Guo
Hi, Rober,

To mitigate this issue, you can increase the "heartbeat.interval" and
"heartbeat.timeout". However, I think we should first figure out the
root cause, would you like to provide the log of
10.42.0.49:6122-e26293?

Best,
Yangze Guo

On Thu, May 27, 2021 at 10:44 PM Robert Cullen  wrote:
>
> I have a job that fails after @1 hour due to a TaskManager Timeout. How can I 
> prevent this from happening?
>
> 2021-05-27 10:24:21
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
> at 
> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
> at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> at 
> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
> at 
> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
> at 
> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
> at 
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala

Re: Heartbeat Timeout

2021-05-27 Thread Jan Brusch

Hi Robert,

that sounds like a case of either your application state ultimately 
being bigger than the available RAM or a memory leak in your application 
(e.g., some states are not properly cleaned out after they are not 
needed anymore).


If you have the available resources you could try and increase the 
TaskManager RAM size by a large amount and see where RAM usage tops out. 
If it ever does... in case of a memory leak it would grow indefinitely. 
Then you could reason about how to fix the memory leak or how to achieve 
your goal with a smaller application state.


A remedy for application states larger than your available RAM is to use 
the RocksDB State backend, which allows for states larger than your 
application RAM. But that requires your kubernetes nodes to be equipped 
with a fast hard drive (SSD, optimally).


That's how I would approach your problem...


Hope that helps

Jan

On 27.05.21 17:51, Robert Cullen wrote:


Hello Jan,

My flink cluster is running on a kubernetes single node (rke). I have 
the JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. 
The TaskManger reaches the max JVM Heap Size after about one hour then 
fails. Here is a snippet from the TaskManager log:


|2021-05-27 15:36:36,040 INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
Resolved JobManager address, beginning registration 2021-05-27 
15:36:36,041 INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
Successful registration at job manager 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
c5ff9686e944f62a24c10c6bf20a5a55. 2021-05-27 15:36:36,042 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish 
JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55. 
2021-05-27 15:36:36,042 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved 
slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55. 
2021-05-27 15:36:36,042 INFO 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free 
slot TaskSlot(index:2, state:ALLOCATED, resource profile: 
ResourceProfile{cpuCores=1., taskHeapMemory=500.000mb 
(524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb 
(786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, 
allocationId: 2f2e7abd16f21e156cab15cfa0d1d090, jobId: 
c5ff9686e944f62a24c10c6bf20a5a55). 2021-05-27 15:36:36,042 INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
Remove job c5ff9686e944f62a24c10c6bf20a5a55 from job leader 
monitoring. 2021-05-27 15:36:36,042 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close 
JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55. 
2021-05-27 15:36:36,043 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot 
request 85433366f8bf1c5efd3b88f634676764 for job 
c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 
. 2021-05-27 15:36:36,043 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot 
for 85433366f8bf1c5efd3b88f634676764. 2021-05-27 15:36:36,043 INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add 
job c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring. 
2021-05-27 15:36:36,043 INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try 
to register at job manager 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with 
leader id ----. 2021-05-27 
15:36:36,044 INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
Resolved JobManager address, beginning registration 2021-05-27 
15:36:36,045 INFO 
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - 
Successful registration at job manager 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
c5ff9686e944f62a24c10c6bf20a5a55. |


I guess the simple resolution is to increase the JVM Heap Size?


On Thu, May 27, 2021 at 10:51 AM Jan Brusch > wrote:


Hi Robert,

do you have some additional info? For example the last log message
of the unreachable TaskManagers. Is the Job running in kubernetes?
What backend are you using?

From the first looks of it, I have seen this behaviour mostly in
cases where one or more taskmanagers shut down due to
GarbageCollection issues or OutOfMemory-Errors.


Best regards

Jan

On 27.05.21 16:44, Robert Cullen wrote:


I have a job that fails after @1 hour due to a TaskManager
Timeout. How can I prevent this from happening?

|2021-05-27 10:24:21 org.apache.flink.runtime.JobException:
Recovery is suppressed by NoRestartBackoffTimeStrategy at

org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at

org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandle

Re: Heartbeat Timeout

2021-05-27 Thread Robert Cullen
Hello Jan,

My flink cluster is running on a kubernetes single node (rke). I have the
JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The
TaskManger reaches the max JVM Heap Size after about one hour then fails.
Here is a snippet from the TaskManager log:

2021-05-27 15:36:36,040 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -
Resolved JobManager address, beginning registration
2021-05-27 15:36:36,041 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -
Successful registration at job manager
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job
c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Establish JobManager connection for job
c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Offer reserved slots to the leader of job
c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free
slot TaskSlot(index:2, state:ALLOCATED, resource profile:
ResourceProfile{cpuCores=1., taskHeapMemory=500.000mb
(524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb
(786432000 bytes), networkMemory=146.000mb (153092098 bytes)},
allocationId: 2f2e7abd16f21e156cab15cfa0d1d090, jobId:
c5ff9686e944f62a24c10c6bf20a5a55).
2021-05-27 15:36:36,042 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -
Remove job c5ff9686e944f62a24c10c6bf20a5a55 from job leader
monitoring.
2021-05-27 15:36:36,042 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Close JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,043 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Receive slot request 85433366f8bf1c5efd3b88f634676764 for job
c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id
.
2021-05-27 15:36:36,043 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Allocated slot for 85433366f8bf1c5efd3b88f634676764.
2021-05-27 15:36:36,043 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add
job c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
2021-05-27 15:36:36,043 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try
to register at job manager
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with
leader id ----.
2021-05-27 15:36:36,044 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -
Resolved JobManager address, beginning registration
2021-05-27 15:36:36,045 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] -
Successful registration at job manager
akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job
c5ff9686e944f62a24c10c6bf20a5a55.

I guess the simple resolution is to increase the JVM Heap Size?

On Thu, May 27, 2021 at 10:51 AM Jan Brusch 
wrote:

> Hi Robert,
>
> do you have some additional info? For example the last log message of the
> unreachable TaskManagers. Is the Job running in kubernetes? What backend
> are you using?
>
> From the first looks of it, I have seen this behaviour mostly in cases
> where one or more taskmanagers shut down due to GarbageCollection issues or
> OutOfMemory-Errors.
>
>
> Best regards
>
> Jan
> On 27.05.21 16:44, Robert Cullen wrote:
>
> I have a job that fails after @1 hour due to a TaskManager Timeout. How
> can I prevent this from happening?
>
> 2021-05-27 10:24:21
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
> at 
> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
> at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
> at 
> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution

Fwd: Heartbeat Timeout

2021-05-27 Thread Jan Brusch

Hi Robert,

do you have some additional info? For example the last log message of 
the unreachable TaskManagers. Is the Job running in kubernetes? What 
backend are you using?


From the first looks of it, I have seen this behaviour mostly in cases 
where one or more taskmanagers shut down due to GarbageCollection issues 
or OutOfMemory-Errors.



Best regards

Jan

On 27.05.21 16:44, Robert Cullen wrote:


I have a job that fails after @1 hour due to a TaskManager Timeout. 
How can I prevent this from happening?


|2021-05-27 10:24:21 org.apache.flink.runtime.JobException: Recovery 
is suppressed by NoRestartBackoffTimeStrategy at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) 
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) 
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) 
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) 
at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) 
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) 
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139) 
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079) 
at 
org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783) 
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) 
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) 
at 
org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) 
at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) 
at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) 
at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) 
at 
org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) 
at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) 
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385) 
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361) 
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) 
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) 
at 
org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497) 
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295) 
at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) 
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) 
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) 
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) 
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at 
akka.actor.Actor.aroundReceive(Actor.scala:517) at 
akka.actor.Actor.aroundReceive$(Actor.scala:515) at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at 
akka.actor.ActorCell.invoke(ActorCell.scala:561) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at 
akka.dispatch.Mailbox.run(Mailbox.scala:225) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:235) at 
akka.dispatch.

Heartbeat Timeout

2021-05-27 Thread Robert Cullen
I have a job that fails after @1 hour due to a TaskManager Timeout. How can
I prevent this from happening?

2021-05-27 10:24:21
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
at 
org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
at 
org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
at 
org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
at 
org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.r

Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-11 Thread Xintong Song
No worries :)


Thank you~

Xintong Song



On Mon, Oct 12, 2020 at 2:48 PM Paul Lam  wrote:

> Sorry for the misspelled name, Xintong
>
> Best,
> Paul Lam
>
> 2020年10月12日 14:46,Paul Lam  写道:
>
> Hi Xingtong,
>
> Thanks a lot for the pointer!
>
> It’s good to see there would be a new IO executor to take care of the TM
> contexts. Looking forward to the 1.12 release!
>
> Best,
> Paul Lam
>
> 2020年10月12日 14:18,Xintong Song  写道:
>
> Hi Paul,
>
> Thanks for reporting this.
>
> Indeed, Flink's RM currently performs several HDFS operations in the rpc
> main thread when preparing the TM context, which may block the main thread
> when HDFS is slow.
>
> Unfortunately, I don't see any out-of-box approach that fixes the problem
> at the moment, except for increasing the heartbeat timeout.
>
> As for the long run solution, I think there's an easier approach. We can
> move creating of the TM contexts away from the rpc main thread. Ideally, we
> should try to avoid performing any heavy operations which do not modify the
> RM's internal states in the rpc main thread. With FLINK-19241, this can be
> achieved easily by delegating the work to the io executor.
>
> Thank you~
> Xintong Song
>
>
>
> On Mon, Oct 12, 2020 at 12:44 PM Paul Lam  wrote:
>
>> Hi,
>>
>> After FLINK-13184 is implemented (even with Flink 1.11), occasionally
>> there would still be jobs
>> with high parallelism getting TM-RM heartbeat timeouts when RM is busy
>> creating TM contexts
>> on cluster initialization and HDFS is slow at that moment.
>>
>> Apart from increasing the TM heartbeat timeout, is there any recommended
>>  out of the box
>> approach that can reduce the chance of getting the timeouts?
>>
>> In the long run, is it possible to limit the number of taskmanager
>> contexts that RM creates at
>> a time, so that the heartbeat triggers can chime in?
>>
>> Thanks!
>>
>> Best,
>> Paul Lam
>>
>
>
>


Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-11 Thread Paul Lam
Sorry for the misspelled name, Xintong

Best,
Paul Lam

> 2020年10月12日 14:46,Paul Lam  写道:
> 
> Hi Xingtong,
> 
> Thanks a lot for the pointer!
> 
> It’s good to see there would be a new IO executor to take care of the TM 
> contexts. Looking forward to the 1.12 release!
> 
> Best,
> Paul Lam
> 
>> 2020年10月12日 14:18,Xintong Song > <mailto:tonysong...@gmail.com>> 写道:
>> 
>> Hi Paul,
>> 
>> Thanks for reporting this.
>> 
>> Indeed, Flink's RM currently performs several HDFS operations in the rpc 
>> main thread when preparing the TM context, which may block the main thread 
>> when HDFS is slow.
>> 
>> Unfortunately, I don't see any out-of-box approach that fixes the problem at 
>> the moment, except for increasing the heartbeat timeout.
>> 
>> As for the long run solution, I think there's an easier approach. We can 
>> move creating of the TM contexts away from the rpc main thread. Ideally, we 
>> should try to avoid performing any heavy operations which do not modify the 
>> RM's internal states in the rpc main thread. With FLINK-19241, this can be 
>> achieved easily by delegating the work to the io executor.
>> 
>> Thank you~
>> Xintong Song
>> 
>> 
>> On Mon, Oct 12, 2020 at 12:44 PM Paul Lam > <mailto:paullin3...@gmail.com>> wrote:
>> Hi,
>> 
>> After FLINK-13184 is implemented (even with Flink 1.11), occasionally there 
>> would still be jobs 
>> with high parallelism getting TM-RM heartbeat timeouts when RM is busy 
>> creating TM contexts 
>> on cluster initialization and HDFS is slow at that moment. 
>> 
>> Apart from increasing the TM heartbeat timeout, is there any recommended  
>> out of the box 
>> approach that can reduce the chance of getting the timeouts? 
>> 
>> In the long run, is it possible to limit the number of taskmanager contexts 
>> that RM creates at 
>> a time, so that the heartbeat triggers can chime in? 
>> 
>> Thanks!
>> 
>> Best,
>> Paul Lam
> 



Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-11 Thread Paul Lam
Hi Xingtong,

Thanks a lot for the pointer!

It’s good to see there would be a new IO executor to take care of the TM 
contexts. Looking forward to the 1.12 release!

Best,
Paul Lam

> 2020年10月12日 14:18,Xintong Song  写道:
> 
> Hi Paul,
> 
> Thanks for reporting this.
> 
> Indeed, Flink's RM currently performs several HDFS operations in the rpc main 
> thread when preparing the TM context, which may block the main thread when 
> HDFS is slow.
> 
> Unfortunately, I don't see any out-of-box approach that fixes the problem at 
> the moment, except for increasing the heartbeat timeout.
> 
> As for the long run solution, I think there's an easier approach. We can move 
> creating of the TM contexts away from the rpc main thread. Ideally, we should 
> try to avoid performing any heavy operations which do not modify the RM's 
> internal states in the rpc main thread. With FLINK-19241, this can be 
> achieved easily by delegating the work to the io executor.
> 
> Thank you~
> Xintong Song
> 
> 
> On Mon, Oct 12, 2020 at 12:44 PM Paul Lam  <mailto:paullin3...@gmail.com>> wrote:
> Hi,
> 
> After FLINK-13184 is implemented (even with Flink 1.11), occasionally there 
> would still be jobs 
> with high parallelism getting TM-RM heartbeat timeouts when RM is busy 
> creating TM contexts 
> on cluster initialization and HDFS is slow at that moment. 
> 
> Apart from increasing the TM heartbeat timeout, is there any recommended  out 
> of the box 
> approach that can reduce the chance of getting the timeouts? 
> 
> In the long run, is it possible to limit the number of taskmanager contexts 
> that RM creates at 
> a time, so that the heartbeat triggers can chime in? 
> 
> Thanks!
> 
> Best,
> Paul Lam



Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-11 Thread Xintong Song
FYI, I just created FLINK-19568 for tracking this issue.


Thank you~

Xintong Song


[1] https://issues.apache.org/jira/browse/FLINK-19568

On Mon, Oct 12, 2020 at 2:18 PM Xintong Song  wrote:

> Hi Paul,
>
> Thanks for reporting this.
>
> Indeed, Flink's RM currently performs several HDFS operations in the rpc
> main thread when preparing the TM context, which may block the main thread
> when HDFS is slow.
>
> Unfortunately, I don't see any out-of-box approach that fixes the problem
> at the moment, except for increasing the heartbeat timeout.
>
> As for the long run solution, I think there's an easier approach. We can
> move creating of the TM contexts away from the rpc main thread. Ideally, we
> should try to avoid performing any heavy operations which do not modify the
> RM's internal states in the rpc main thread. With FLINK-19241, this can be
> achieved easily by delegating the work to the io executor.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Oct 12, 2020 at 12:44 PM Paul Lam  wrote:
>
>> Hi,
>>
>> After FLINK-13184 is implemented (even with Flink 1.11), occasionally
>> there would still be jobs
>> with high parallelism getting TM-RM heartbeat timeouts when RM is busy
>> creating TM contexts
>> on cluster initialization and HDFS is slow at that moment.
>>
>> Apart from increasing the TM heartbeat timeout, is there any recommended
>>  out of the box
>> approach that can reduce the chance of getting the timeouts?
>>
>> In the long run, is it possible to limit the number of taskmanager
>> contexts that RM creates at
>> a time, so that the heartbeat triggers can chime in?
>>
>> Thanks!
>>
>> Best,
>> Paul Lam
>>
>


Re: TM heartbeat timeout due to ResourceManager being busy

2020-10-11 Thread Xintong Song
Hi Paul,

Thanks for reporting this.

Indeed, Flink's RM currently performs several HDFS operations in the rpc
main thread when preparing the TM context, which may block the main thread
when HDFS is slow.

Unfortunately, I don't see any out-of-box approach that fixes the problem
at the moment, except for increasing the heartbeat timeout.

As for the long run solution, I think there's an easier approach. We can
move creating of the TM contexts away from the rpc main thread. Ideally, we
should try to avoid performing any heavy operations which do not modify the
RM's internal states in the rpc main thread. With FLINK-19241, this can be
achieved easily by delegating the work to the io executor.

Thank you~

Xintong Song



On Mon, Oct 12, 2020 at 12:44 PM Paul Lam  wrote:

> Hi,
>
> After FLINK-13184 is implemented (even with Flink 1.11), occasionally
> there would still be jobs
> with high parallelism getting TM-RM heartbeat timeouts when RM is busy
> creating TM contexts
> on cluster initialization and HDFS is slow at that moment.
>
> Apart from increasing the TM heartbeat timeout, is there any recommended
>  out of the box
> approach that can reduce the chance of getting the timeouts?
>
> In the long run, is it possible to limit the number of taskmanager
> contexts that RM creates at
> a time, so that the heartbeat triggers can chime in?
>
> Thanks!
>
> Best,
> Paul Lam
>


TM heartbeat timeout due to ResourceManager being busy

2020-10-11 Thread Paul Lam
Hi,

After FLINK-13184 is implemented (even with Flink 1.11), occasionally there 
would still be jobs 
with high parallelism getting TM-RM heartbeat timeouts when RM is busy creating 
TM contexts 
on cluster initialization and HDFS is slow at that moment. 

Apart from increasing the TM heartbeat timeout, is there any recommended  out 
of the box 
approach that can reduce the chance of getting the timeouts? 

In the long run, is it possible to limit the number of taskmanager contexts 
that RM creates at 
a time, so that the heartbeat triggers can chime in? 

Thanks!

Best,
Paul Lam

Frequent Heartbeat timeout

2019-02-11 Thread sohimankotia
Hi,

I am using Flink 1.5.5 . I have streaming job with 25 * 6 (150) parallelism
. I am facing too frequent heartbeat timeout . Even during off peak hours to
rule out memory issues .

Also I enabled debug logs for flink and observed Heartbeat request is
getting triggered every 5 seconds. 
*
Flink-conf.yml*

akka.ask.timeout120s
akka.client.timeout 900s
akka.framesize  256477376b
akka.lookup.timeout 60s
akka.tcp.timeout900s
akka.watch.heartbeat.interval   120s
akka.watch.heartbeat.pause  900s


If that is that this could false alerts ?


Logs :

2019-02-12 09:40:20,279 DEBUG org.apache.flink.yarn.YarnResourceManager 
   
- Received new slot report from TaskManager
container_e187_1544779926156_149366_01_25.
2019-02-12 09:40:20,279 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Received
slot report from instance cee153db98a847c6ee779c31d60fd990.
*2019-02-12 09:40:24,951 DEBUG org.apache.flink.runtime.jobmaster.JobMaster 

- Trigger heartbeat request.*
2019-02-12 09:40:24,955 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_06.
2019-02-12 09:40:24,956 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_04.
2019-02-12 09:40:24,956 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_26.
2019-02-12 09:40:24,958 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_20.
2019-02-12 09:40:24,959 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_10.
2019-02-12 09:40:24,960 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_09.
2019-02-12 09:40:24,962 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_25.
2019-02-12 09:40:24,963 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_14.
2019-02-12 09:40:24,964 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_11.
2019-02-12 09:40:24,966 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_12.
2019-02-12 09:40:24,967 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_18.
2019-02-12 09:40:24,969 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_05.
2019-02-12 09:40:24,970 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_24.
2019-02-12 09:40:24,971 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_22.
2019-02-12 09:40:24,975 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_21.
2019-02-12 09:40:24,977 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_08.
2019-02-12 09:40:24,978 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_15.
2019-02-12 09:40:24,981 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_16.
2019-02-12 09:40:24,983 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_03.
2019-02-12 09:40:24,984 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_13.
2019-02-12 09:40:24,985 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_19.
2019-02-12 09:40:24,986 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_07.
2019-02-12 09:40:24,987 DEBUG org.apache.flink.runtime.jobmaster.JobMaster  
   
- Received heartbeat from container_e187_1544779926156_149366_01_02.
2019-02-12 09:40:24,989 DEBUG