Thanks TIll and Yang for help! Also Thanks Till for a quick fix! I did another test yesterday. In this test, I intentionally throw exception from the source operator: ``` if (runtimeContext.getIndexOfThisSubtask() == 1 && errorFrenquecyInMin > 0 && System.currentTimeMillis() - lastStartTime >= errorFrenquecyInMin * 60 * 1000) { lastStartTime = System.currentTimeMillis(); throw new RuntimeException( "Trigger expected exception at: " + lastStartTime); } ``` In this case, I found phase 1 still takes about 30s and Phase 2 dropped to 1s (because no need for container allocation).
Some logs: ``` ``` On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org> wrote: > A quick addition, I think with FLINK-23202 it should now also be possible > to improve the heartbeat mechanism in the general case. We can leverage the > unreachability exception thrown if a remote target is no longer reachable > to mark an heartbeat target as no longer reachable [1]. This can then be > considered as if the heartbeat timeout has been triggered. That way we > should detect lost TaskExecutors as fast as our heartbeat interval is. > > [1] https://issues.apache.org/jira/browse/FLINK-23209 > > Cheers, > Till > > On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote: > >> Since you are deploying Flink workloads on Yarn, the Flink >> ResourceManager should get the container >> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which >> is 8 seconds by default. >> And Flink ResourceManager will release the dead TaskManager container >> once received the completion event. >> As a result, Flink will not deploy tasks onto the dead TaskManagers. >> >> >> I think most of the time cost in Phase 1 might be cancelling the tasks on >> the dead TaskManagers. >> >> >> Best, >> Yang >> >> >> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道: >> >>> The analysis of Gen is correct. Flink currently uses its heartbeat as >>> the primary means to detect dead TaskManagers. This means that Flink will >>> take at least `heartbeat.timeout` time before the system recovers. Even if >>> the cancellation happens fast (e.g. by having configured a low >>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead >>> TaskManager until it is marked as dead and its slots are released (unless >>> the ResourceManager does not get a signal from the underlying resource >>> management system that a container/pod has died). One way to improve the >>> situation is to introduce logic which can react to a ConnectionException >>> and then black lists or releases a TaskManager, for example. This is >>> currently not implemented in Flink, though. >>> >>> Concerning the cancellation operation: Flink currently does not listen >>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the >>> primary means to fail the future result of a rpc which could not be sent. >>> This is also an improvement we should add to Flink's RpcService. I've >>> created a JIRA issue for this problem [1]. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-23202 >>> >>> Cheers, >>> Till >>> >>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote: >>> >>>> Thanks Gen! cc flink-dev to collect more inputs. >>>> >>>> Best >>>> Lu >>>> >>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote: >>>> >>>>> I'm also wondering here. >>>>> >>>>> In my opinion, it's because the JM can not confirm whether the TM is >>>>> lost or it's a temporary network trouble and will recover soon, since I >>>>> can >>>>> see in the log that akka has got a Connection refused but JM still sends a >>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But >>>>> I'm not sure if it's indeed designed like this. >>>>> >>>>> I would really appreciate it if anyone who knows more details could >>>>> answer. Thanks. >>>>> >>>>