[ 
https://issues.apache.org/jira/browse/FLINK-21331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu reassigned FLINK-21331:
-------------------------------

    Assignee: Zhilong Hong

> Optimize calculating tasks to restart in 
> RestartPipelinedRegionFailoverStrategy
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-21331
>                 URL: https://issues.apache.org/jira/browse/FLINK-21331
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>            Reporter: Zhilong Hong
>            Assignee: Zhilong Hong
>            Priority: Major
>             Fix For: 1.13.0
>
>
> RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to 
> restart when a task failure occurs. It contains two parts: firstly calculate 
> the regions to restart; then add all the tasks in these regions to the 
> restarting queue.
> The bottleneck is mainly in the first part. This part traverses all the 
> upstream and downstream regions of the failed region to determine whether 
> they should be restarted or not.
> For the current failed region, if its consumed result partition is not 
> available, the owner, i.e., the upstream region should restart. Also, since 
> the failed region needs to restart, its result partition won't be available, 
> all the downstream regions need to restart, too.
> 1. Calculating the upstream regions that should restart
> The current implementation is:
> {code:java}
> for each SchedulingExecutionVertex in current visited 
> SchedulingPipelinedRegion:
>   for each consumed SchedulingResultPartition of the 
> SchedulingExecutionVertex:
>     if the result partition is not available:
>       add the producer region to the restart queue
> {code}
> Based on FLINK-21328, the consumed result partition of a vertex is already 
> grouped. Here we can use a HashSet to record the visited result partition 
> group. For vertices connected with all-to-all edges, they will only need to 
> traverse the group once. This decreases the time complexity from O(N^2) to 
> O(N).
> 2. Calculating the downstream regions that should restart
> The current implementation is:
> {code:java}
> for each SchedulingExecutionVertex in current visited 
> SchedulingPipelinedRegion:
>   for each produced SchedulingResultPartition of the 
> SchedulingExecutionVertex:
>     for each consumer SchedulingExecutionVertex of the produced 
> SchedulingResultPartition:
>       if the region containing the consumer SchedulingExecutionVertex is not 
> visited:
>         add the region to the restart queue
> {code}
> Since the count of the produced result partitions of a vertex equals the 
> count of output JobEdges, the time complexity of this procedure is actually 
> O(N^2). As the consumer vertices of a result partition are already grouped, 
> we can use a HashSet to record the visited ConsumerVertexGroup. The time 
> complexity decreases from O(N^2) to O(N).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to