Re: Job Recovery Time on TM Lost

2021-07-12 Thread 刘建刚
Yes, time is main when detecting the TM's liveness. The count method will
check by certain intervals.

Gen Luo  于2021年7月9日周五 上午10:37写道:

> @刘建刚
> Welcome to join the discuss and thanks for sharing your experience.
>
> I have a minor question. In my experience, network failures in a certain
> cluster usually takes a time to recovery, which can be measured as p99 to
> guide configuring. So I suppose it would be better to use time than attempt
> count as the configuration for confirming TM liveness. How do you think
> about this? Or is the premise right according to your experience?
>
> @Lu Niu 
> > Does that mean the akka timeout situation we talked above doesn't apply
> to flink 1.11?
>
> I suppose it's true. According to the reply from Till in FLINK-23216
> , it should be
> confirmed that the problem is introduced by declarative resource
> management, which is introduced to Flink in 1.12.
>
> In previous versions, although JM still uses heartbeat to check TMs
> status, RM will tell JM about TM lost once it is noticed by Yarn. This is
> much faster than JM's heartbeat mechanism, if one uses default heartbeat
> configurations. However, after 1.12 with declarative resource management,
> RM will no longer tell this to JM, since it doesn't have a related
> AllocationID.  So the heartbeat mechanism becomes the only way JM can know
> about TM lost.
>
> On Fri, Jul 9, 2021 at 6:34 AM Lu Niu  wrote:
>
>> Thanks everyone! This is a great discussion!
>>
>> 1. Restarting takes 30s when throwing exceptions from application code
>> because the restart delay is 30s in config. Before lots of related config
>> are 30s which lead to the confusion. I redo the test with config:
>>
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
>> backoffTimeMS=1000)
>> heartbeat.timeout: 50
>> akka.ask.timeout 30 s
>> akka.lookup.timeout 30 s
>> akka.tcp.timeout 30 s
>> akka.watch.heartbeat.interval 30 s
>> akka.watch.heartbeat.pause 120 s
>>
>>Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
>> restart takes 14s. Does that mean the akka timeout situation we talked
>> above doesn't apply to flink 1.11?
>>
>> 2. About flaky connection between TMs, we did notice sometimes exception
>> as follows:
>> ```
>> TaskFoo switched from RUNNING to FAILED on
>> container_e02_1599158147594_156068_01_38 @
>> xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
>> (dataPort=40957).
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager '
>> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
>> This might indicate that the remote task manager was lost.
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at
>> 

Re: Job Recovery Time on TM Lost

2021-07-09 Thread Till Rohrmann
Gen is right with his explanation why the dead TM discovery can be faster
with Flink < 1.12.

Concerning flaky TaskManager connections:

2.1 I think the problem is that the receiving TM does not know the
container ID of the sending TM. It only knows its address. But this is
something one could improve by sending this information along with the
connection information. This could improve debugging.
2.3 The idea would be to establish a reconnection mechanism between the
TMs. This hasn't been done yet, though. The non-trivial part is probably
that we need to introduce an ack-protocol and keep a window of sent events
on the sender side in order to resend events in case of lost packages.

Cheers,
Till

On Fri, Jul 9, 2021 at 4:38 AM Gen Luo  wrote:

> @刘建刚
> Welcome to join the discuss and thanks for sharing your experience.
>
> I have a minor question. In my experience, network failures in a certain
> cluster usually takes a time to recovery, which can be measured as p99 to
> guide configuring. So I suppose it would be better to use time than attempt
> count as the configuration for confirming TM liveness. How do you think
> about this? Or is the premise right according to your experience?
>
> @Lu Niu 
> > Does that mean the akka timeout situation we talked above doesn't apply
> to flink 1.11?
>
> I suppose it's true. According to the reply from Till in FLINK-23216
> , it should be
> confirmed that the problem is introduced by declarative resource
> management, which is introduced to Flink in 1.12.
>
> In previous versions, although JM still uses heartbeat to check TMs
> status, RM will tell JM about TM lost once it is noticed by Yarn. This is
> much faster than JM's heartbeat mechanism, if one uses default heartbeat
> configurations. However, after 1.12 with declarative resource management,
> RM will no longer tell this to JM, since it doesn't have a related
> AllocationID.  So the heartbeat mechanism becomes the only way JM can know
> about TM lost.
>
> On Fri, Jul 9, 2021 at 6:34 AM Lu Niu  wrote:
>
>> Thanks everyone! This is a great discussion!
>>
>> 1. Restarting takes 30s when throwing exceptions from application code
>> because the restart delay is 30s in config. Before lots of related config
>> are 30s which lead to the confusion. I redo the test with config:
>>
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
>> backoffTimeMS=1000)
>> heartbeat.timeout: 50
>> akka.ask.timeout 30 s
>> akka.lookup.timeout 30 s
>> akka.tcp.timeout 30 s
>> akka.watch.heartbeat.interval 30 s
>> akka.watch.heartbeat.pause 120 s
>>
>>Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
>> restart takes 14s. Does that mean the akka timeout situation we talked
>> above doesn't apply to flink 1.11?
>>
>> 2. About flaky connection between TMs, we did notice sometimes exception
>> as follows:
>> ```
>> TaskFoo switched from RUNNING to FAILED on
>> container_e02_1599158147594_156068_01_38 @
>> xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
>> (dataPort=40957).
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager '
>> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
>> This might indicate that the remote task manager was lost.
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
>> at
>> 

Re: Job Recovery Time on TM Lost

2021-07-08 Thread Gen Luo
@刘建刚
Welcome to join the discuss and thanks for sharing your experience.

I have a minor question. In my experience, network failures in a certain
cluster usually takes a time to recovery, which can be measured as p99 to
guide configuring. So I suppose it would be better to use time than attempt
count as the configuration for confirming TM liveness. How do you think
about this? Or is the premise right according to your experience?

@Lu Niu 
> Does that mean the akka timeout situation we talked above doesn't apply
to flink 1.11?

I suppose it's true. According to the reply from Till in FLINK-23216
, it should be confirmed
that the problem is introduced by declarative resource management, which is
introduced to Flink in 1.12.

In previous versions, although JM still uses heartbeat to check TMs status,
RM will tell JM about TM lost once it is noticed by Yarn. This is much
faster than JM's heartbeat mechanism, if one uses default heartbeat
configurations. However, after 1.12 with declarative resource management,
RM will no longer tell this to JM, since it doesn't have a related
AllocationID.  So the heartbeat mechanism becomes the only way JM can know
about TM lost.

On Fri, Jul 9, 2021 at 6:34 AM Lu Niu  wrote:

> Thanks everyone! This is a great discussion!
>
> 1. Restarting takes 30s when throwing exceptions from application code
> because the restart delay is 30s in config. Before lots of related config
> are 30s which lead to the confusion. I redo the test with config:
>
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
> backoffTimeMS=1000)
> heartbeat.timeout: 50
> akka.ask.timeout 30 s
> akka.lookup.timeout 30 s
> akka.tcp.timeout 30 s
> akka.watch.heartbeat.interval 30 s
> akka.watch.heartbeat.pause 120 s
>
>Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
> restart takes 14s. Does that mean the akka timeout situation we talked
> above doesn't apply to flink 1.11?
>
> 2. About flaky connection between TMs, we did notice sometimes exception
> as follows:
> ```
> TaskFoo switched from RUNNING to FAILED on
> container_e02_1599158147594_156068_01_38 @
> xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
> (dataPort=40957).
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager '
> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
> This might indicate that the remote task manager was lost.
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> at
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:331)
> at
> 

Re: Job Recovery Time on TM Lost

2021-07-08 Thread Lu Niu
Thanks everyone! This is a great discussion!

1. Restarting takes 30s when throwing exceptions from application code
because the restart delay is 30s in config. Before lots of related config
are 30s which lead to the confusion. I redo the test with config:

FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
backoffTimeMS=1000)
heartbeat.timeout: 50
akka.ask.timeout 30 s
akka.lookup.timeout 30 s
akka.tcp.timeout 30 s
akka.watch.heartbeat.interval 30 s
akka.watch.heartbeat.pause 120 s

   Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
restart takes 14s. Does that mean the akka timeout situation we talked
above doesn't apply to flink 1.11?

2. About flaky connection between TMs, we did notice sometimes exception as
follows:
```
TaskFoo switched from RUNNING to FAILED on
container_e02_1599158147594_156068_01_38 @
xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
(dataPort=40957).
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager '
xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
This might indicate that the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:331)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
```
1. It's a bit inconvenient to debug such an exception because it doesn't
report the exact container id. Right now we have to look for `
xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539`
in JobMananger log to find that.
2. The task manager log doesn't show anything suspicious. Also, no major
GC. So it might imply a flack connection in this case.
3. Is there any short term workaround we can try? any config tuning? Also,
what's the long term solution?

Best
Lu




On Tue, Jul 6, 2021 at 11:45 PM 刘建刚  wrote:

> It is really helpful to find the lost container quickly. In our inner
> flink version, we optimize it by task's report and jobmaster's probe. When
> a task fails because of the connection, it reports to the jobmaster. The
> jobmaster will try to confirm the liveness of the unconnected
> taskmanager for certain times by config. If the jobmaster find the
> taskmanager unconnected or dead, it releases the taskmanger. This will work
> for most cases. For an unstable environment, config needs adjustment.
>
> Gen Luo  于2021年7月6日周二 下午8:41写道:
>
>> Yes, I have noticed the PR and commented there with some consideration
>> about the new option. 

Re: Job Recovery Time on TM Lost

2021-07-07 Thread 刘建刚
It is really helpful to find the lost container quickly. In our inner flink
version, we optimize it by task's report and jobmaster's probe. When a task
fails because of the connection, it reports to the jobmaster. The jobmaster
will try to confirm the liveness of the unconnected taskmanager for certain
times by config. If the jobmaster find the taskmanager unconnected or dead,
it releases the taskmanger. This will work for most cases. For an unstable
environment, config needs adjustment.

Gen Luo  于2021年7月6日周二 下午8:41写道:

> Yes, I have noticed the PR and commented there with some consideration
> about the new option. We can discuss further there.
>
> On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann  wrote:
>
> > This is actually a very good point Gen. There might not be a lot to gain
> > for us by implementing a fancy algorithm for figuring out whether a TM is
> > dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
> > communication does not tolerate failures and directly fails the affected
> > tasks. This assumes that the JM and TM run in the same environment.
> >
> > One simple approach could be to make the number of failed heartbeat RPCs
> > until a target is marked as unreachable configurable because what
> > represents a good enough criterion in one user's environment might
> produce
> > too many false-positives in somebody else's environment. Or even simpler,
> > one could say that one can disable reacting to a failed heartbeat RPC as
> it
> > is currently the case.
> >
> > We currently have a discussion about this on this PR [1]. Maybe you wanna
> > join the discussion there and share your insights.
> >
> > [1] https://github.com/apache/flink/pull/16357
> >
> > Cheers,
> > Till
> >
> > On Tue, Jul 6, 2021 at 4:37 AM Gen Luo  wrote:
> >
> >> I know that there are retry strategies for akka rpc frameworks. I was
> >> just considering that, since the environment is shared by JM and TMs,
> and
> >> the connections among TMs (using netty) are flaky in unstable
> environments,
> >> which will also cause the job failure, is it necessary to build a
> >> strongly guaranteed connection between JM and TMs, or it could be as
> flaky
> >> as the connections among TMs?
> >>
> >> As far as I know, connections among TMs will just fail on their first
> >> connection loss, so behaving like this in JM just means "as flaky as
> >> connections among TMs". In a stable environment it's good enough, but
> in an
> >> unstable environment, it indeed increases the instability. IMO, though a
> >> single connection loss is not reliable, a double check should be good
> >> enough. But since I'm not experienced with an unstable environment, I
> can't
> >> tell whether that's also enough for it.
> >>
> >> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann 
> >> wrote:
> >>
> >>> I think for RPC communication there are retry strategies used by the
> >>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
> >>> ActorSystem and resume communication. Moreover, there are also
> >>> reconciliation protocols in place which reconcile the states between
> the
> >>> components because of potentially lost RPC messages. So the main
> question
> >>> would be whether a single connection loss is good enough for
> triggering the
> >>> timeout or whether we want a more elaborate mechanism to reason about
> the
> >>> availability of the remote system (e.g. a couple of lost heartbeat
> >>> messages).
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:
> >>>
>  As far as I know, a TM will report connection failure once its
>  connected TM is lost. I suppose JM can believe the report and fail the
>  tasks in the lost TM if it also encounters a connection failure.
> 
>  Of course, it won't work if the lost TM is standalone. But I suppose
> we
>  can use the same strategy as the connected scenario. That is,
> consider it
>  possibly lost on the first connection loss, and fail it if double
> check
>  also fails. The major difference is the senders of the probes are the
> same
>  one rather than two different roles, so the results may tend to be
> the same.
> 
>  On the other hand, the fact also means that the jobs can be fragile in
>  an unstable environment, no matter whether the failover is triggered
> by TM
>  or JM. So maybe it's not that worthy to introduce extra
> configurations for
>  fault tolerance of heartbeat, unless we also introduce some retry
>  strategies for netty connections.
> 
> 
>  On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann 
>  wrote:
> 
> > Could you share the full logs with us for the second experiment, Lu?
> I
> > cannot tell from the top of my head why it should take 30s unless
> you have
> > configured a restart delay of 30s.
> >
> > Let's discuss FLINK-23216 on the JIRA ticket, Gen.
> >
> > I've now implemented FLINK-23209 [1] but it somehow has the 

Re: Job Recovery Time on TM Lost

2021-07-06 Thread Gen Luo
Yes, I have noticed the PR and commented there with some consideration
about the new option. We can discuss further there.

On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann  wrote:

> This is actually a very good point Gen. There might not be a lot to gain
> for us by implementing a fancy algorithm for figuring out whether a TM is
> dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
> communication does not tolerate failures and directly fails the affected
> tasks. This assumes that the JM and TM run in the same environment.
>
> One simple approach could be to make the number of failed heartbeat RPCs
> until a target is marked as unreachable configurable because what
> represents a good enough criterion in one user's environment might produce
> too many false-positives in somebody else's environment. Or even simpler,
> one could say that one can disable reacting to a failed heartbeat RPC as it
> is currently the case.
>
> We currently have a discussion about this on this PR [1]. Maybe you wanna
> join the discussion there and share your insights.
>
> [1] https://github.com/apache/flink/pull/16357
>
> Cheers,
> Till
>
> On Tue, Jul 6, 2021 at 4:37 AM Gen Luo  wrote:
>
>> I know that there are retry strategies for akka rpc frameworks. I was
>> just considering that, since the environment is shared by JM and TMs, and
>> the connections among TMs (using netty) are flaky in unstable environments,
>> which will also cause the job failure, is it necessary to build a
>> strongly guaranteed connection between JM and TMs, or it could be as flaky
>> as the connections among TMs?
>>
>> As far as I know, connections among TMs will just fail on their first
>> connection loss, so behaving like this in JM just means "as flaky as
>> connections among TMs". In a stable environment it's good enough, but in an
>> unstable environment, it indeed increases the instability. IMO, though a
>> single connection loss is not reliable, a double check should be good
>> enough. But since I'm not experienced with an unstable environment, I can't
>> tell whether that's also enough for it.
>>
>> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann 
>> wrote:
>>
>>> I think for RPC communication there are retry strategies used by the
>>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
>>> ActorSystem and resume communication. Moreover, there are also
>>> reconciliation protocols in place which reconcile the states between the
>>> components because of potentially lost RPC messages. So the main question
>>> would be whether a single connection loss is good enough for triggering the
>>> timeout or whether we want a more elaborate mechanism to reason about the
>>> availability of the remote system (e.g. a couple of lost heartbeat
>>> messages).
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:
>>>
 As far as I know, a TM will report connection failure once its
 connected TM is lost. I suppose JM can believe the report and fail the
 tasks in the lost TM if it also encounters a connection failure.

 Of course, it won't work if the lost TM is standalone. But I suppose we
 can use the same strategy as the connected scenario. That is, consider it
 possibly lost on the first connection loss, and fail it if double check
 also fails. The major difference is the senders of the probes are the same
 one rather than two different roles, so the results may tend to be the 
 same.

 On the other hand, the fact also means that the jobs can be fragile in
 an unstable environment, no matter whether the failover is triggered by TM
 or JM. So maybe it's not that worthy to introduce extra configurations for
 fault tolerance of heartbeat, unless we also introduce some retry
 strategies for netty connections.


 On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann 
 wrote:

> Could you share the full logs with us for the second experiment, Lu? I
> cannot tell from the top of my head why it should take 30s unless you have
> configured a restart delay of 30s.
>
> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>
> I've now implemented FLINK-23209 [1] but it somehow has the problem
> that in a flakey environment you might not want to mark a TaskExecutor 
> dead
> on the first connection loss. Maybe this is something we need to make
> configurable (e.g. introducing a threshold which admittedly is similar to
> the heartbeat timeout) so that the user can configure it for her
> environment. On the upside, if you mark the TaskExecutor dead on the first
> connection loss (assuming you have a stable network environment), then it
> can now detect lost TaskExecutors as fast as the heartbeat interval.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23209
>
> Cheers,
> Till
>
> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:
>
>> Thanks for 

Re: Job Recovery Time on TM Lost

2021-07-06 Thread Till Rohrmann
This is actually a very good point Gen. There might not be a lot to gain
for us by implementing a fancy algorithm for figuring out whether a TM is
dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
communication does not tolerate failures and directly fails the affected
tasks. This assumes that the JM and TM run in the same environment.

One simple approach could be to make the number of failed heartbeat RPCs
until a target is marked as unreachable configurable because what
represents a good enough criterion in one user's environment might produce
too many false-positives in somebody else's environment. Or even simpler,
one could say that one can disable reacting to a failed heartbeat RPC as it
is currently the case.

We currently have a discussion about this on this PR [1]. Maybe you wanna
join the discussion there and share your insights.

[1] https://github.com/apache/flink/pull/16357

Cheers,
Till

On Tue, Jul 6, 2021 at 4:37 AM Gen Luo  wrote:

> I know that there are retry strategies for akka rpc frameworks. I was just
> considering that, since the environment is shared by JM and TMs, and the
> connections among TMs (using netty) are flaky in unstable environments,
> which will also cause the job failure, is it necessary to build a
> strongly guaranteed connection between JM and TMs, or it could be as flaky
> as the connections among TMs?
>
> As far as I know, connections among TMs will just fail on their first
> connection loss, so behaving like this in JM just means "as flaky as
> connections among TMs". In a stable environment it's good enough, but in an
> unstable environment, it indeed increases the instability. IMO, though a
> single connection loss is not reliable, a double check should be good
> enough. But since I'm not experienced with an unstable environment, I can't
> tell whether that's also enough for it.
>
> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann  wrote:
>
>> I think for RPC communication there are retry strategies used by the
>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
>> ActorSystem and resume communication. Moreover, there are also
>> reconciliation protocols in place which reconcile the states between the
>> components because of potentially lost RPC messages. So the main question
>> would be whether a single connection loss is good enough for triggering the
>> timeout or whether we want a more elaborate mechanism to reason about the
>> availability of the remote system (e.g. a couple of lost heartbeat
>> messages).
>>
>> Cheers,
>> Till
>>
>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:
>>
>>> As far as I know, a TM will report connection failure once its connected
>>> TM is lost. I suppose JM can believe the report and fail the tasks in the
>>> lost TM if it also encounters a connection failure.
>>>
>>> Of course, it won't work if the lost TM is standalone. But I suppose we
>>> can use the same strategy as the connected scenario. That is, consider it
>>> possibly lost on the first connection loss, and fail it if double check
>>> also fails. The major difference is the senders of the probes are the same
>>> one rather than two different roles, so the results may tend to be the same.
>>>
>>> On the other hand, the fact also means that the jobs can be fragile in
>>> an unstable environment, no matter whether the failover is triggered by TM
>>> or JM. So maybe it's not that worthy to introduce extra configurations for
>>> fault tolerance of heartbeat, unless we also introduce some retry
>>> strategies for netty connections.
>>>
>>>
>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann 
>>> wrote:
>>>
 Could you share the full logs with us for the second experiment, Lu? I
 cannot tell from the top of my head why it should take 30s unless you have
 configured a restart delay of 30s.

 Let's discuss FLINK-23216 on the JIRA ticket, Gen.

 I've now implemented FLINK-23209 [1] but it somehow has the problem
 that in a flakey environment you might not want to mark a TaskExecutor dead
 on the first connection loss. Maybe this is something we need to make
 configurable (e.g. introducing a threshold which admittedly is similar to
 the heartbeat timeout) so that the user can configure it for her
 environment. On the upside, if you mark the TaskExecutor dead on the first
 connection loss (assuming you have a stable network environment), then it
 can now detect lost TaskExecutors as fast as the heartbeat interval.

 [1] https://issues.apache.org/jira/browse/FLINK-23209

 Cheers,
 Till

 On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:

> Thanks for sharing, Till and Yang.
>
> @Lu
> Sorry but I don't know how to explain the new test with the log. Let's
> wait for others' reply.
>
> @Till
> It would be nice if JIRAs could be fixed. Thanks again for proposing
> them.
>
> In addition, I was tracking an issue that RM 

Re: Job Recovery Time on TM Lost

2021-07-05 Thread Gen Luo
I know that there are retry strategies for akka rpc frameworks. I was just
considering that, since the environment is shared by JM and TMs, and the
connections among TMs (using netty) are flaky in unstable environments,
which will also cause the job failure, is it necessary to build a
strongly guaranteed connection between JM and TMs, or it could be as flaky
as the connections among TMs?

As far as I know, connections among TMs will just fail on their first
connection loss, so behaving like this in JM just means "as flaky as
connections among TMs". In a stable environment it's good enough, but in an
unstable environment, it indeed increases the instability. IMO, though a
single connection loss is not reliable, a double check should be good
enough. But since I'm not experienced with an unstable environment, I can't
tell whether that's also enough for it.

On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann  wrote:

> I think for RPC communication there are retry strategies used by the
> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
> ActorSystem and resume communication. Moreover, there are also
> reconciliation protocols in place which reconcile the states between the
> components because of potentially lost RPC messages. So the main question
> would be whether a single connection loss is good enough for triggering the
> timeout or whether we want a more elaborate mechanism to reason about the
> availability of the remote system (e.g. a couple of lost heartbeat
> messages).
>
> Cheers,
> Till
>
> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:
>
>> As far as I know, a TM will report connection failure once its connected
>> TM is lost. I suppose JM can believe the report and fail the tasks in the
>> lost TM if it also encounters a connection failure.
>>
>> Of course, it won't work if the lost TM is standalone. But I suppose we
>> can use the same strategy as the connected scenario. That is, consider it
>> possibly lost on the first connection loss, and fail it if double check
>> also fails. The major difference is the senders of the probes are the same
>> one rather than two different roles, so the results may tend to be the same.
>>
>> On the other hand, the fact also means that the jobs can be fragile in an
>> unstable environment, no matter whether the failover is triggered by TM or
>> JM. So maybe it's not that worthy to introduce extra configurations for
>> fault tolerance of heartbeat, unless we also introduce some retry
>> strategies for netty connections.
>>
>>
>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann 
>> wrote:
>>
>>> Could you share the full logs with us for the second experiment, Lu? I
>>> cannot tell from the top of my head why it should take 30s unless you have
>>> configured a restart delay of 30s.
>>>
>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>>
>>> I've now implemented FLINK-23209 [1] but it somehow has the problem that
>>> in a flakey environment you might not want to mark a TaskExecutor dead on
>>> the first connection loss. Maybe this is something we need to make
>>> configurable (e.g. introducing a threshold which admittedly is similar to
>>> the heartbeat timeout) so that the user can configure it for her
>>> environment. On the upside, if you mark the TaskExecutor dead on the first
>>> connection loss (assuming you have a stable network environment), then it
>>> can now detect lost TaskExecutors as fast as the heartbeat interval.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:
>>>
 Thanks for sharing, Till and Yang.

 @Lu
 Sorry but I don't know how to explain the new test with the log. Let's
 wait for others' reply.

 @Till
 It would be nice if JIRAs could be fixed. Thanks again for proposing
 them.

 In addition, I was tracking an issue that RM keeps allocating and
 freeing slots after a TM lost until its heartbeat timeout, when I found the
 recovery costing as long as heartbeat timeout. That should be a minor bug
 introduced by declarative resource management. I have created a JIRA about
 the problem [1] and  we can discuss it there if necessary.

 [1] https://issues.apache.org/jira/browse/FLINK-23216

 Lu Niu  于2021年7月2日周五 上午3:13写道:

> Another side question, Shall we add metric to cover the complete
> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
> covers phase 1. Thanks!
>
> Best
> Lu
>
> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>> && errorFrenquecyInMin > 0
>> && System.currentTimeMillis() - lastStartTime >=
>> 

Re: Job Recovery Time on TM Lost

2021-07-05 Thread Till Rohrmann
I think for RPC communication there are retry strategies used by the
underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
ActorSystem and resume communication. Moreover, there are also
reconciliation protocols in place which reconcile the states between the
components because of potentially lost RPC messages. So the main question
would be whether a single connection loss is good enough for triggering the
timeout or whether we want a more elaborate mechanism to reason about the
availability of the remote system (e.g. a couple of lost heartbeat
messages).

Cheers,
Till

On Mon, Jul 5, 2021 at 10:00 AM Gen Luo  wrote:

> As far as I know, a TM will report connection failure once its connected
> TM is lost. I suppose JM can believe the report and fail the tasks in the
> lost TM if it also encounters a connection failure.
>
> Of course, it won't work if the lost TM is standalone. But I suppose we
> can use the same strategy as the connected scenario. That is, consider it
> possibly lost on the first connection loss, and fail it if double check
> also fails. The major difference is the senders of the probes are the same
> one rather than two different roles, so the results may tend to be the same.
>
> On the other hand, the fact also means that the jobs can be fragile in an
> unstable environment, no matter whether the failover is triggered by TM or
> JM. So maybe it's not that worthy to introduce extra configurations for
> fault tolerance of heartbeat, unless we also introduce some retry
> strategies for netty connections.
>
>
> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann  wrote:
>
>> Could you share the full logs with us for the second experiment, Lu? I
>> cannot tell from the top of my head why it should take 30s unless you have
>> configured a restart delay of 30s.
>>
>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>
>> I've now implemented FLINK-23209 [1] but it somehow has the problem that
>> in a flakey environment you might not want to mark a TaskExecutor dead on
>> the first connection loss. Maybe this is something we need to make
>> configurable (e.g. introducing a threshold which admittedly is similar to
>> the heartbeat timeout) so that the user can configure it for her
>> environment. On the upside, if you mark the TaskExecutor dead on the first
>> connection loss (assuming you have a stable network environment), then it
>> can now detect lost TaskExecutors as fast as the heartbeat interval.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>
>> Cheers,
>> Till
>>
>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:
>>
>>> Thanks for sharing, Till and Yang.
>>>
>>> @Lu
>>> Sorry but I don't know how to explain the new test with the log. Let's
>>> wait for others' reply.
>>>
>>> @Till
>>> It would be nice if JIRAs could be fixed. Thanks again for proposing
>>> them.
>>>
>>> In addition, I was tracking an issue that RM keeps allocating and
>>> freeing slots after a TM lost until its heartbeat timeout, when I found the
>>> recovery costing as long as heartbeat timeout. That should be a minor bug
>>> introduced by declarative resource management. I have created a JIRA about
>>> the problem [1] and  we can discuss it there if necessary.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>>>
>>> Lu Niu  于2021年7月2日周五 上午3:13写道:
>>>
 Another side question, Shall we add metric to cover the complete
 restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
 covers phase 1. Thanks!

 Best
 Lu

 On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
> && errorFrenquecyInMin > 0
> && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>   lastStartTime = System.currentTimeMillis();
>   throw new RuntimeException(
>   "Trigger expected exception at: " + lastStartTime);
> }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2
> dropped to 1s (because no need for container allocation). Why phase 1 
> still
> takes 30s even though no TM is lost?
>
> Related logs:
> ```
> 2021-06-30 00:55:07,463 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
> 2021-06-30 00:55:07,509 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
> RESTARTING.
> 2021-06-30 00:55:37,596 INFO

Re: Job Recovery Time on TM Lost

2021-07-05 Thread Gen Luo
As far as I know, a TM will report connection failure once its connected TM
is lost. I suppose JM can believe the report and fail the tasks in the lost
TM if it also encounters a connection failure.

Of course, it won't work if the lost TM is standalone. But I suppose we can
use the same strategy as the connected scenario. That is, consider it
possibly lost on the first connection loss, and fail it if double check
also fails. The major difference is the senders of the probes are the same
one rather than two different roles, so the results may tend to be the same.

On the other hand, the fact also means that the jobs can be fragile in an
unstable environment, no matter whether the failover is triggered by TM or
JM. So maybe it's not that worthy to introduce extra configurations for
fault tolerance of heartbeat, unless we also introduce some retry
strategies for netty connections.


On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann  wrote:

> Could you share the full logs with us for the second experiment, Lu? I
> cannot tell from the top of my head why it should take 30s unless you have
> configured a restart delay of 30s.
>
> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>
> I've now implemented FLINK-23209 [1] but it somehow has the problem that
> in a flakey environment you might not want to mark a TaskExecutor dead on
> the first connection loss. Maybe this is something we need to make
> configurable (e.g. introducing a threshold which admittedly is similar to
> the heartbeat timeout) so that the user can configure it for her
> environment. On the upside, if you mark the TaskExecutor dead on the first
> connection loss (assuming you have a stable network environment), then it
> can now detect lost TaskExecutors as fast as the heartbeat interval.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23209
>
> Cheers,
> Till
>
> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:
>
>> Thanks for sharing, Till and Yang.
>>
>> @Lu
>> Sorry but I don't know how to explain the new test with the log. Let's
>> wait for others' reply.
>>
>> @Till
>> It would be nice if JIRAs could be fixed. Thanks again for proposing them.
>>
>> In addition, I was tracking an issue that RM keeps allocating and freeing
>> slots after a TM lost until its heartbeat timeout, when I found the
>> recovery costing as long as heartbeat timeout. That should be a minor bug
>> introduced by declarative resource management. I have created a JIRA about
>> the problem [1] and  we can discuss it there if necessary.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>>
>> Lu Niu  于2021年7月2日周五 上午3:13写道:
>>
>>> Another side question, Shall we add metric to cover the complete
>>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
>>> covers phase 1. Thanks!
>>>
>>> Best
>>> Lu
>>>
>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>>>
 Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

 I did another test yesterday. In this test, I intentionally throw
 exception from the source operator:
 ```
 if (runtimeContext.getIndexOfThisSubtask() == 1
 && errorFrenquecyInMin > 0
 && System.currentTimeMillis() - lastStartTime >=
 errorFrenquecyInMin * 60 * 1000) {
   lastStartTime = System.currentTimeMillis();
   throw new RuntimeException(
   "Trigger expected exception at: " + lastStartTime);
 }
 ```
 In this case, I found phase 1 still takes about 30s and Phase 2 dropped
 to 1s (because no need for container allocation). Why phase 1 still takes
 30s even though no TM is lost?

 Related logs:
 ```
 2021-06-30 00:55:07,463 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
 USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
 java.lang.RuntimeException: Trigger expected exception at: 1625014507446
 2021-06-30 00:55:07,509 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
 NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
 (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
 RESTARTING.
 2021-06-30 00:55:37,596 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
 NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
 (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
 RUNNING.
 2021-06-30 00:55:38,678 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
 all tasks switch from CREATED to RUNNING)
 ```
 Best
 Lu


 On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
> && errorFrenquecyInMin 

Re: Job Recovery Time on TM Lost

2021-07-02 Thread Till Rohrmann
Could you share the full logs with us for the second experiment, Lu? I
cannot tell from the top of my head why it should take 30s unless you have
configured a restart delay of 30s.

Let's discuss FLINK-23216 on the JIRA ticket, Gen.

I've now implemented FLINK-23209 [1] but it somehow has the problem that in
a flakey environment you might not want to mark a TaskExecutor dead on the
first connection loss. Maybe this is something we need to make configurable
(e.g. introducing a threshold which admittedly is similar to the heartbeat
timeout) so that the user can configure it for her environment. On the
upside, if you mark the TaskExecutor dead on the first connection loss
(assuming you have a stable network environment), then it can now detect
lost TaskExecutors as fast as the heartbeat interval.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Fri, Jul 2, 2021 at 9:33 AM Gen Luo  wrote:

> Thanks for sharing, Till and Yang.
>
> @Lu
> Sorry but I don't know how to explain the new test with the log. Let's
> wait for others' reply.
>
> @Till
> It would be nice if JIRAs could be fixed. Thanks again for proposing them.
>
> In addition, I was tracking an issue that RM keeps allocating and freeing
> slots after a TM lost until its heartbeat timeout, when I found the
> recovery costing as long as heartbeat timeout. That should be a minor bug
> introduced by declarative resource management. I have created a JIRA about
> the problem [1] and  we can discuss it there if necessary.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
>
> Lu Niu  于2021年7月2日周五 上午3:13写道:
>
>> Another side question, Shall we add metric to cover the complete
>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
>> covers phase 1. Thanks!
>>
>> Best
>> Lu
>>
>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>> && errorFrenquecyInMin > 0
>>> && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>   lastStartTime = System.currentTimeMillis();
>>>   throw new RuntimeException(
>>>   "Trigger expected exception at: " + lastStartTime);
>>> }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation). Why phase 1 still takes
>>> 30s even though no TM is lost?
>>>
>>> Related logs:
>>> ```
>>> 2021-06-30 00:55:07,463 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>>> 2021-06-30 00:55:07,509 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>>> RESTARTING.
>>> 2021-06-30 00:55:37,596 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>>> RUNNING.
>>> 2021-06-30 00:55:38,678 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
>>> all tasks switch from CREATED to RUNNING)
>>> ```
>>> Best
>>> Lu
>>>
>>>
>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>>>
 Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

 I did another test yesterday. In this test, I intentionally throw
 exception from the source operator:
 ```
 if (runtimeContext.getIndexOfThisSubtask() == 1
 && errorFrenquecyInMin > 0
 && System.currentTimeMillis() - lastStartTime >=
 errorFrenquecyInMin * 60 * 1000) {
   lastStartTime = System.currentTimeMillis();
   throw new RuntimeException(
   "Trigger expected exception at: " + lastStartTime);
 }
 ```
 In this case, I found phase 1 still takes about 30s and Phase 2 dropped
 to 1s (because no need for container allocation).

 Some logs:
 ```
 ```


 On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
 wrote:

> A quick addition, I think with FLINK-23202 it should now also be
> possible to improve the heartbeat mechanism in the general case. We can
> leverage the unreachability exception thrown if a remote target is no
> longer reachable to mark an heartbeat target as no longer reachable [1].
> This can then be considered as if the heartbeat timeout has been 
> triggered.
> That way we should detect lost TaskExecutors as fast as our heartbeat
> interval is.
>
> [1] 

Re: Job Recovery Time on TM Lost

2021-07-02 Thread Gen Luo
Thanks for sharing, Till and Yang.

@Lu
Sorry but I don't know how to explain the new test with the log. Let's wait
for others' reply.

@Till
It would be nice if JIRAs could be fixed. Thanks again for proposing them.

In addition, I was tracking an issue that RM keeps allocating and freeing
slots after a TM lost until its heartbeat timeout, when I found the
recovery costing as long as heartbeat timeout. That should be a minor bug
introduced by declarative resource management. I have created a JIRA about
the problem [1] and  we can discuss it there if necessary.

[1] https://issues.apache.org/jira/browse/FLINK-23216

Lu Niu  于2021年7月2日周五 上午3:13写道:

> Another side question, Shall we add metric to cover the complete
> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
> covers phase 1. Thanks!
>
> Best
> Lu
>
> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>> && errorFrenquecyInMin > 0
>> && System.currentTimeMillis() - lastStartTime >=
>> errorFrenquecyInMin * 60 * 1000) {
>>   lastStartTime = System.currentTimeMillis();
>>   throw new RuntimeException(
>>   "Trigger expected exception at: " + lastStartTime);
>> }
>> ```
>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>> to 1s (because no need for container allocation). Why phase 1 still takes
>> 30s even though no TM is lost?
>>
>> Related logs:
>> ```
>> 2021-06-30 00:55:07,463 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>> 2021-06-30 00:55:07,509 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>> RESTARTING.
>> 2021-06-30 00:55:37,596 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>> RUNNING.
>> 2021-06-30 00:55:38,678 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
>> all tasks switch from CREATED to RUNNING)
>> ```
>> Best
>> Lu
>>
>>
>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>> && errorFrenquecyInMin > 0
>>> && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>   lastStartTime = System.currentTimeMillis();
>>>   throw new RuntimeException(
>>>   "Trigger expected exception at: " + lastStartTime);
>>> }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation).
>>>
>>> Some logs:
>>> ```
>>> ```
>>>
>>>
>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
>>> wrote:
>>>
 A quick addition, I think with FLINK-23202 it should now also be
 possible to improve the heartbeat mechanism in the general case. We can
 leverage the unreachability exception thrown if a remote target is no
 longer reachable to mark an heartbeat target as no longer reachable [1].
 This can then be considered as if the heartbeat timeout has been triggered.
 That way we should detect lost TaskExecutors as fast as our heartbeat
 interval is.

 [1] https://issues.apache.org/jira/browse/FLINK-23209

 Cheers,
 Till

 On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:

> Since you are deploying Flink workloads on Yarn, the Flink
> ResourceManager should get the container
> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
> which is 8 seconds by default.
> And Flink ResourceManager will release the dead TaskManager container
> once received the completion event.
> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>
>
> I think most of the time cost in Phase 1 might be cancelling the tasks
> on the dead TaskManagers.
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>
>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>> the primary means to detect dead TaskManagers. This means that Flink will
>> take at least `heartbeat.timeout` time before the system recovers. Even 
>> if
>> the cancellation happens fast (e.g. by 

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Another side question, Shall we add metric to cover the complete restarting
time (phase 1 + phase 2)? Current metric jm.restartingTime only covers
phase 1. Thanks!

Best
Lu

On Thu, Jul 1, 2021 at 12:09 PM Lu Niu  wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
> && errorFrenquecyInMin > 0
> && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>   lastStartTime = System.currentTimeMillis();
>   throw new RuntimeException(
>   "Trigger expected exception at: " + lastStartTime);
> }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
> 1s (because no need for container allocation). Why phase 1 still takes 30s
> even though no TM is lost?
>
> Related logs:
> ```
> 2021-06-30 00:55:07,463 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
> 2021-06-30 00:55:07,509 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
> RESTARTING.
> 2021-06-30 00:55:37,596 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
> RUNNING.
> 2021-06-30 00:55:38,678 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
> all tasks switch from CREATED to RUNNING)
> ```
> Best
> Lu
>
>
> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>> && errorFrenquecyInMin > 0
>> && System.currentTimeMillis() - lastStartTime >=
>> errorFrenquecyInMin * 60 * 1000) {
>>   lastStartTime = System.currentTimeMillis();
>>   throw new RuntimeException(
>>   "Trigger expected exception at: " + lastStartTime);
>> }
>> ```
>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>> to 1s (because no need for container allocation).
>>
>> Some logs:
>> ```
>> ```
>>
>>
>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann 
>> wrote:
>>
>>> A quick addition, I think with FLINK-23202 it should now also be
>>> possible to improve the heartbeat mechanism in the general case. We can
>>> leverage the unreachability exception thrown if a remote target is no
>>> longer reachable to mark an heartbeat target as no longer reachable [1].
>>> This can then be considered as if the heartbeat timeout has been triggered.
>>> That way we should detect lost TaskExecutors as fast as our heartbeat
>>> interval is.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>>>
 Since you are deploying Flink workloads on Yarn, the Flink
 ResourceManager should get the container
 completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
 which is 8 seconds by default.
 And Flink ResourceManager will release the dead TaskManager container
 once received the completion event.
 As a result, Flink will not deploy tasks onto the dead TaskManagers.


 I think most of the time cost in Phase 1 might be cancelling the tasks
 on the dead TaskManagers.


 Best,
 Yang


 Till Rohrmann  于2021年7月1日周四 下午4:49写道:

> The analysis of Gen is correct. Flink currently uses its heartbeat as
> the primary means to detect dead TaskManagers. This means that Flink will
> take at least `heartbeat.timeout` time before the system recovers. Even if
> the cancellation happens fast (e.g. by having configured a low
> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
> TaskManager until it is marked as dead and its slots are released (unless
> the ResourceManager does not get a signal from the underlying resource
> management system that a container/pod has died). One way to improve the
> situation is to introduce logic which can react to a ConnectionException
> and then black lists or releases a TaskManager, for example. This is
> currently not implemented in Flink, though.
>
> Concerning the cancellation operation: Flink currently does not listen
> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
> primary means to fail the future result of a rpc which could not be sent.

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

I did another test yesterday. In this test, I intentionally throw exception
from the source operator:
```
if (runtimeContext.getIndexOfThisSubtask() == 1
&& errorFrenquecyInMin > 0
&& System.currentTimeMillis() - lastStartTime >=
errorFrenquecyInMin * 60 * 1000) {
  lastStartTime = System.currentTimeMillis();
  throw new RuntimeException(
  "Trigger expected exception at: " + lastStartTime);
}
```
In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
1s (because no need for container allocation). Why phase 1 still takes 30s
even though no TM is lost?

Related logs:
```
2021-06-30 00:55:07,463 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
java.lang.RuntimeException: Trigger expected exception at: 1625014507446
2021-06-30 00:55:07,509 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
(35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
RESTARTING.
2021-06-30 00:55:37,596 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
(35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
RUNNING.
2021-06-30 00:55:38,678 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph(time when
all tasks switch from CREATED to RUNNING)
```
Best
Lu


On Thu, Jul 1, 2021 at 12:06 PM Lu Niu  wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
> && errorFrenquecyInMin > 0
> && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>   lastStartTime = System.currentTimeMillis();
>   throw new RuntimeException(
>   "Trigger expected exception at: " + lastStartTime);
> }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
> 1s (because no need for container allocation).
>
> Some logs:
> ```
> ```
>
>
> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann  wrote:
>
>> A quick addition, I think with FLINK-23202 it should now also be possible
>> to improve the heartbeat mechanism in the general case. We can leverage the
>> unreachability exception thrown if a remote target is no longer reachable
>> to mark an heartbeat target as no longer reachable [1]. This can then be
>> considered as if the heartbeat timeout has been triggered. That way we
>> should detect lost TaskExecutors as fast as our heartbeat interval is.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>
>> Cheers,
>> Till
>>
>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>>
>>> Since you are deploying Flink workloads on Yarn, the Flink
>>> ResourceManager should get the container
>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>> which is 8 seconds by default.
>>> And Flink ResourceManager will release the dead TaskManager container
>>> once received the completion event.
>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>>
>>>
>>> I think most of the time cost in Phase 1 might be cancelling the tasks
>>> on the dead TaskManagers.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>>
>>> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>>>
 The analysis of Gen is correct. Flink currently uses its heartbeat as
 the primary means to detect dead TaskManagers. This means that Flink will
 take at least `heartbeat.timeout` time before the system recovers. Even if
 the cancellation happens fast (e.g. by having configured a low
 akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
 TaskManager until it is marked as dead and its slots are released (unless
 the ResourceManager does not get a signal from the underlying resource
 management system that a container/pod has died). One way to improve the
 situation is to introduce logic which can react to a ConnectionException
 and then black lists or releases a TaskManager, for example. This is
 currently not implemented in Flink, though.

 Concerning the cancellation operation: Flink currently does not listen
 to the dead letters of Akka. This means that the `akka.ask.timeout` is the
 primary means to fail the future result of a rpc which could not be sent.
 This is also an improvement we should add to Flink's RpcService. I've
 created a JIRA issue for this problem [1].

 [1] https://issues.apache.org/jira/browse/FLINK-23202

 Cheers,
 Till

 On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:

> Thanks Gen! cc flink-dev to collect more inputs.
>
> Best
> Lu
>
> On Wed, Jun 30, 

Re: Job Recovery Time on TM Lost

2021-07-01 Thread Lu Niu
Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

I did another test yesterday. In this test, I intentionally throw exception
from the source operator:
```
if (runtimeContext.getIndexOfThisSubtask() == 1
&& errorFrenquecyInMin > 0
&& System.currentTimeMillis() - lastStartTime >=
errorFrenquecyInMin * 60 * 1000) {
  lastStartTime = System.currentTimeMillis();
  throw new RuntimeException(
  "Trigger expected exception at: " + lastStartTime);
}
```
In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
1s (because no need for container allocation).

Some logs:
```
```


On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann  wrote:

> A quick addition, I think with FLINK-23202 it should now also be possible
> to improve the heartbeat mechanism in the general case. We can leverage the
> unreachability exception thrown if a remote target is no longer reachable
> to mark an heartbeat target as no longer reachable [1]. This can then be
> considered as if the heartbeat timeout has been triggered. That way we
> should detect lost TaskExecutors as fast as our heartbeat interval is.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23209
>
> Cheers,
> Till
>
> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:
>
>> Since you are deploying Flink workloads on Yarn, the Flink
>> ResourceManager should get the container
>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
>> is 8 seconds by default.
>> And Flink ResourceManager will release the dead TaskManager container
>> once received the completion event.
>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>
>>
>> I think most of the time cost in Phase 1 might be cancelling the tasks on
>> the dead TaskManagers.
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>>
>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>> the primary means to detect dead TaskManagers. This means that Flink will
>>> take at least `heartbeat.timeout` time before the system recovers. Even if
>>> the cancellation happens fast (e.g. by having configured a low
>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>>> TaskManager until it is marked as dead and its slots are released (unless
>>> the ResourceManager does not get a signal from the underlying resource
>>> management system that a container/pod has died). One way to improve the
>>> situation is to introduce logic which can react to a ConnectionException
>>> and then black lists or releases a TaskManager, for example. This is
>>> currently not implemented in Flink, though.
>>>
>>> Concerning the cancellation operation: Flink currently does not listen
>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
>>> primary means to fail the future result of a rpc which could not be sent.
>>> This is also an improvement we should add to Flink's RpcService. I've
>>> created a JIRA issue for this problem [1].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>>>
 Thanks Gen! cc flink-dev to collect more inputs.

 Best
 Lu

 On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:

> I'm also wondering here.
>
> In my opinion, it's because the JM can not confirm whether the TM is
> lost or it's a temporary network trouble and will recover soon, since I 
> can
> see in the log that akka has got a Connection refused but JM still sends a
> heartbeat request to the lost TM until it reaches heartbeat timeout. But
> I'm not sure if it's indeed designed like this.
>
> I would really appreciate it if anyone who knows more details could
> answer. Thanks.
>



Re: Job Recovery Time on TM Lost

2021-07-01 Thread Till Rohrmann
A quick addition, I think with FLINK-23202 it should now also be possible
to improve the heartbeat mechanism in the general case. We can leverage the
unreachability exception thrown if a remote target is no longer reachable
to mark an heartbeat target as no longer reachable [1]. This can then be
considered as if the heartbeat timeout has been triggered. That way we
should detect lost TaskExecutors as fast as our heartbeat interval is.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Thu, Jul 1, 2021 at 1:46 PM Yang Wang  wrote:

> Since you are deploying Flink workloads on Yarn, the Flink ResourceManager
> should get the container
> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
> is 8 seconds by default.
> And Flink ResourceManager will release the dead TaskManager container once
> received the completion event.
> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>
>
> I think most of the time cost in Phase 1 might be cancelling the tasks on
> the dead TaskManagers.
>
>
> Best,
> Yang
>
>
> Till Rohrmann  于2021年7月1日周四 下午4:49写道:
>
>> The analysis of Gen is correct. Flink currently uses its heartbeat as the
>> primary means to detect dead TaskManagers. This means that Flink will take
>> at least `heartbeat.timeout` time before the system recovers. Even if the
>> cancellation happens fast (e.g. by having configured a low
>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>> TaskManager until it is marked as dead and its slots are released (unless
>> the ResourceManager does not get a signal from the underlying resource
>> management system that a container/pod has died). One way to improve the
>> situation is to introduce logic which can react to a ConnectionException
>> and then black lists or releases a TaskManager, for example. This is
>> currently not implemented in Flink, though.
>>
>> Concerning the cancellation operation: Flink currently does not listen to
>> the dead letters of Akka. This means that the `akka.ask.timeout` is the
>> primary means to fail the future result of a rpc which could not be sent.
>> This is also an improvement we should add to Flink's RpcService. I've
>> created a JIRA issue for this problem [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>>
>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>
>>> Best
>>> Lu
>>>
>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>>>
 I'm also wondering here.

 In my opinion, it's because the JM can not confirm whether the TM is
 lost or it's a temporary network trouble and will recover soon, since I can
 see in the log that akka has got a Connection refused but JM still sends a
 heartbeat request to the lost TM until it reaches heartbeat timeout. But
 I'm not sure if it's indeed designed like this.

 I would really appreciate it if anyone who knows more details could
 answer. Thanks.

>>>


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Yang Wang
Since you are deploying Flink workloads on Yarn, the Flink ResourceManager
should get the container
completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
is 8 seconds by default.
And Flink ResourceManager will release the dead TaskManager container once
received the completion event.
As a result, Flink will not deploy tasks onto the dead TaskManagers.


I think most of the time cost in Phase 1 might be cancelling the tasks on
the dead TaskManagers.


Best,
Yang


Till Rohrmann  于2021年7月1日周四 下午4:49写道:

> The analysis of Gen is correct. Flink currently uses its heartbeat as the
> primary means to detect dead TaskManagers. This means that Flink will take
> at least `heartbeat.timeout` time before the system recovers. Even if the
> cancellation happens fast (e.g. by having configured a low
> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
> TaskManager until it is marked as dead and its slots are released (unless
> the ResourceManager does not get a signal from the underlying resource
> management system that a container/pod has died). One way to improve the
> situation is to introduce logic which can react to a ConnectionException
> and then black lists or releases a TaskManager, for example. This is
> currently not implemented in Flink, though.
>
> Concerning the cancellation operation: Flink currently does not listen to
> the dead letters of Akka. This means that the `akka.ask.timeout` is the
> primary means to fail the future result of a rpc which could not be sent.
> This is also an improvement we should add to Flink's RpcService. I've
> created a JIRA issue for this problem [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-23202
>
> Cheers,
> Till
>
> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:
>
>> Thanks Gen! cc flink-dev to collect more inputs.
>>
>> Best
>> Lu
>>
>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>>
>>> I'm also wondering here.
>>>
>>> In my opinion, it's because the JM can not confirm whether the TM is
>>> lost or it's a temporary network trouble and will recover soon, since I can
>>> see in the log that akka has got a Connection refused but JM still sends a
>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>>> I'm not sure if it's indeed designed like this.
>>>
>>> I would really appreciate it if anyone who knows more details could
>>> answer. Thanks.
>>>
>>


Re: Job Recovery Time on TM Lost

2021-07-01 Thread Till Rohrmann
The analysis of Gen is correct. Flink currently uses its heartbeat as the
primary means to detect dead TaskManagers. This means that Flink will take
at least `heartbeat.timeout` time before the system recovers. Even if the
cancellation happens fast (e.g. by having configured a low
akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
TaskManager until it is marked as dead and its slots are released (unless
the ResourceManager does not get a signal from the underlying resource
management system that a container/pod has died). One way to improve the
situation is to introduce logic which can react to a ConnectionException
and then black lists or releases a TaskManager, for example. This is
currently not implemented in Flink, though.

Concerning the cancellation operation: Flink currently does not listen to
the dead letters of Akka. This means that the `akka.ask.timeout` is the
primary means to fail the future result of a rpc which could not be sent.
This is also an improvement we should add to Flink's RpcService. I've
created a JIRA issue for this problem [1].

[1] https://issues.apache.org/jira/browse/FLINK-23202

Cheers,
Till

On Wed, Jun 30, 2021 at 6:33 PM Lu Niu  wrote:

> Thanks Gen! cc flink-dev to collect more inputs.
>
> Best
> Lu
>
> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:
>
>> I'm also wondering here.
>>
>> In my opinion, it's because the JM can not confirm whether the TM is lost
>> or it's a temporary network trouble and will recover soon, since I can see
>> in the log that akka has got a Connection refused but JM still sends a
>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>> I'm not sure if it's indeed designed like this.
>>
>> I would really appreciate it if anyone who knows more details could
>> answer. Thanks.
>>
>


Re: Job Recovery Time on TM Lost

2021-06-30 Thread Lu Niu
Thanks Gen! cc flink-dev to collect more inputs.

Best
Lu

On Wed, Jun 30, 2021 at 12:55 AM Gen Luo  wrote:

> I'm also wondering here.
>
> In my opinion, it's because the JM can not confirm whether the TM is lost
> or it's a temporary network trouble and will recover soon, since I can see
> in the log that akka has got a Connection refused but JM still sends a
> heartbeat request to the lost TM until it reaches heartbeat timeout. But
> I'm not sure if it's indeed designed like this.
>
> I would really appreciate it if anyone who knows more details could
> answer. Thanks.
>