[ https://issues.apache.org/jira/browse/FLINK-21331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhu Zhu closed FLINK-21331. --------------------------- Resolution: Fixed done via 9c95cc19bed1a8c9dddcfa3969614474ee4934c2 > Optimize calculating tasks to restart in > RestartPipelinedRegionFailoverStrategy > ------------------------------------------------------------------------------- > > Key: FLINK-21331 > URL: https://issues.apache.org/jira/browse/FLINK-21331 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination > Reporter: Zhilong Hong > Assignee: Zhilong Hong > Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to > restart when a task failure occurs. It contains two parts: firstly calculate > the regions to restart; then add all the tasks in these regions to the > restarting queue. > The bottleneck is mainly in the first part. This part traverses all the > upstream and downstream regions of the failed region to determine whether > they should be restarted or not. > For the current failed region, if its consumed result partition is not > available, the owner, i.e., the upstream region should restart. Also, since > the failed region needs to restart, its result partition won't be available, > all the downstream regions need to restart, too. > 1. Calculating the upstream regions that should restart > The current implementation is: > {code:java} > for each SchedulingExecutionVertex in current visited > SchedulingPipelinedRegion: > for each consumed SchedulingResultPartition of the > SchedulingExecutionVertex: > if the result partition is not available: > add the producer region to the restart queue > {code} > Based on FLINK-21328, the consumed result partition of a vertex is already > grouped. Here we can use a HashSet to record the visited result partition > group. For vertices connected with all-to-all edges, they will only need to > traverse the group once. This decreases the time complexity from O(N^2) to > O(N). > 2. Calculating the downstream regions that should restart > The current implementation is: > {code:java} > for each SchedulingExecutionVertex in current visited > SchedulingPipelinedRegion: > for each produced SchedulingResultPartition of the > SchedulingExecutionVertex: > for each consumer SchedulingExecutionVertex of the produced > SchedulingResultPartition: > if the region containing the consumer SchedulingExecutionVertex is not > visited: > add the region to the restart queue > {code} > Since the count of the produced result partitions of a vertex equals the > count of output JobEdges, the time complexity of this procedure is actually > O(N^2). As the consumer vertices of a result partition are already grouped, > we can use a HashSet to record the visited ConsumerVertexGroup. The time > complexity decreases from O(N^2) to O(N). -- This message was sent by Atlassian Jira (v8.3.4#803005)