Zhilong Hong created FLINK-21331:
------------------------------------

             Summary: Optimize calculating tasks to restart in 
RestartPipelinedRegionFailoverStrategy
                 Key: FLINK-21331
                 URL: https://issues.apache.org/jira/browse/FLINK-21331
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / Coordination
            Reporter: Zhilong Hong
             Fix For: 1.13.0


RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to 
restart when a task failure occurs. It contains two parts: firstly calculate 
the regions to restart; then add all the tasks in these regions to the 
restarting queue.

The bottleneck is mainly in the first part. This part traverses all the 
upstream and downstream regions of the failed region to determine whether they 
should be restarted or not.

For the current failed region, if its consumed result partition is not 
available, the owner, i.e., the upstream region should restart. Also, since the 
failed region needs to restart, its result partition won't be available, all 
the downstream regions need to restart, too.

1. Calculating the upstream regions that should restart

The current implementation is:
{code:java}
for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion:
  for each consumed SchedulingResultPartition of the SchedulingExecutionVertex:
    if the result partition is not available:
      add the producer region to the restart queue
{code}
Based on FLINK-21328, the consumed result partition of a vertex is already 
grouped. Here we can use a HashSet to record the visited result partition 
group. For vertices connected with all-to-all edges, they will only need to 
traverse the group once. This decreases the time complexity from O(N^2) to O(N).

2. Calculating the downstream regions that should restart

The current implementation is:
{code:java}
for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion:
  for each produced SchedulingResultPartition of the SchedulingExecutionVertex:
    for each consumer SchedulingExecutionVertex of the produced 
SchedulingResultPartition:
      if the region containing the consumer SchedulingExecutionVertex is not 
visited:
        add the region to the restart queue
{code}
Since the count of the produced result partitions of a vertex equals the count 
of output JobEdges, the time complexity of this procedure is actually O(N^2). 
As the consumer vertices of a result partition are already grouped, we can use 
a HashSet to record the visited ConsumerVertexGroup. The time complexity 
decreases from O(N^2) to O(N).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to