Thanks TIll and Yang for help! Also Thanks Till for a quick fix!

I did another test yesterday. In this test, I intentionally throw exception
from the source operator:
```
if (runtimeContext.getIndexOfThisSubtask() == 1
        && errorFrenquecyInMin > 0
        && System.currentTimeMillis() - lastStartTime >=
errorFrenquecyInMin * 60 * 1000) {
      lastStartTime = System.currentTimeMillis();
      throw new RuntimeException(
          "Trigger expected exception at: " + lastStartTime);
    }
```
In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
1s (because no need for container allocation).

Some logs:
```
```


On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org> wrote:

> A quick addition, I think with FLINK-23202 it should now also be possible
> to improve the heartbeat mechanism in the general case. We can leverage the
> unreachability exception thrown if a remote target is no longer reachable
> to mark an heartbeat target as no longer reachable [1]. This can then be
> considered as if the heartbeat timeout has been triggered. That way we
> should detect lost TaskExecutors as fast as our heartbeat interval is.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23209
>
> Cheers,
> Till
>
> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote:
>
>> Since you are deploying Flink workloads on Yarn, the Flink
>> ResourceManager should get the container
>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
>> is 8 seconds by default.
>> And Flink ResourceManager will release the dead TaskManager container
>> once received the completion event.
>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>
>>
>> I think most of the time cost in Phase 1 might be cancelling the tasks on
>> the dead TaskManagers.
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
>>
>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>> the primary means to detect dead TaskManagers. This means that Flink will
>>> take at least `heartbeat.timeout` time before the system recovers. Even if
>>> the cancellation happens fast (e.g. by having configured a low
>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>>> TaskManager until it is marked as dead and its slots are released (unless
>>> the ResourceManager does not get a signal from the underlying resource
>>> management system that a container/pod has died). One way to improve the
>>> situation is to introduce logic which can react to a ConnectionException
>>> and then black lists or releases a TaskManager, for example. This is
>>> currently not implemented in Flink, though.
>>>
>>> Concerning the cancellation operation: Flink currently does not listen
>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
>>> primary means to fail the future result of a rpc which could not be sent.
>>> This is also an improvement we should add to Flink's RpcService. I've
>>> created a JIRA issue for this problem [1].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote:
>>>
>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>
>>>> Best
>>>> Lu
>>>>
>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote:
>>>>
>>>>> I'm also wondering here.
>>>>>
>>>>> In my opinion, it's because the JM can not confirm whether the TM is
>>>>> lost or it's a temporary network trouble and will recover soon, since I 
>>>>> can
>>>>> see in the log that akka has got a Connection refused but JM still sends a
>>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>>>>> I'm not sure if it's indeed designed like this.
>>>>>
>>>>> I would really appreciate it if anyone who knows more details could
>>>>> answer. Thanks.
>>>>>
>>>>

Reply via email to