[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy
[ https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999589#comment-15999589 ] ASF GitHub Bot commented on FLINK-5867: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3773 > The implementation of RestartPipelinedRegionStrategy > > > Key: FLINK-5867 > URL: https://issues.apache.org/jira/browse/FLINK-5867 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > > The RestartPipelinedRegionStrategy's responsibility is the following: > 1. Calculate all FailoverRegions and their relations when initializing. > 2. Listen for the failure of the job and executions, and find corresponding > FailoverRegions to do the failover. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy
[ https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999455#comment-15999455 ] ASF GitHub Bot commented on FLINK-5867: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3773 Thanks you @shuai-xu for checking this. Merging this for 1.3, together with the updates proposed by you. I think we need to initially make a small change that non-pipelined exchanges are NOT boundaries of failover regions, until we have the enhancement in that triggers upstream region restart when a result could not be fetched. > The implementation of RestartPipelinedRegionStrategy > > > Key: FLINK-5867 > URL: https://issues.apache.org/jira/browse/FLINK-5867 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > > The RestartPipelinedRegionStrategy's responsibility is the following: > 1. Calculate all FailoverRegions and their relations when initializing. > 2. Listen for the failure of the job and executions, and find corresponding > FailoverRegions to do the failover. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy
[ https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984131#comment-15984131 ] ASF GitHub Bot commented on FLINK-5867: --- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/3773#discussion_r113361061 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java --- @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * FailoverRegion manages the failover of a minimal pipeline connected sub graph. + * It will change from CREATED to CANCELING and then to CANCELLED and at last to RUNNING, + */ +public class FailoverRegion { + + private static final AtomicReferenceFieldUpdaterSTATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state"); + + /** The log object used for debugging. */ + private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class); + + // + + /** a unique id for debugging */ + private final AbstractID id = new AbstractID(); + + private final ExecutionGraph executionGraph; + + private final List connectedExecutionVertexes; + + /** The executor that executes the recovery action after all vertices are in a */ + private final Executor executor; + + /** Current status of the job execution */ + private volatile JobStatus state = JobStatus.RUNNING; + + + public FailoverRegion(ExecutionGraph executionGraph, Executor executor, List connectedExecutions) { + this.executionGraph = checkNotNull(executionGraph); + this.executor = checkNotNull(executor); + this.connectedExecutionVertexes = checkNotNull(connectedExecutions); + + LOG.debug("Created failover region {} with vertices: {}", id, connectedExecutions); + } + + public void onExecutionFail(Execution taskExecution, Throwable cause) { + // TODO: check if need to failover the preceding region + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + executionGraph.failGlobal(cause); + } + else { + cancel(taskExecution.getGlobalModVersion()); + } + } + + private void allVerticesInTerminalState(long globalModVersionOfFailover) { + while (true) { + JobStatus curStatus = this.state; + if (curStatus.equals(JobStatus.CANCELLING)) { +
[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy
[ https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983350#comment-15983350 ] ASF GitHub Bot commented on FLINK-5867: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3773 [FLINK-5867] [FLINK-5866] [flip-1] Implement FailoverStrategy for pipelined regions This is based on #3772 , the relevant commits are the latter four. The majority of the work has been done by @tiemsn , with some rebasing and additions from me. # Pipelined Region Failover As described in [FLIP-1](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures), this pull request implements the detection of pipelined regions in the `ExecutionGraph` and failover within these pipelined regions. ![st0-nzqia5abpwrgaogpllw](https://cloud.githubusercontent.com/assets/1727146/25399938/54fda5a4-29f1-11e7-9efe-5d845644089f.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink flip-1-pipelined-regions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3773.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3773 commit ef7fd9964c1c74feb4641e57a138c54558b2449c Author: Stephan EwenDate: 2017-03-21T18:13:34Z [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery - Add base class for FailoverStrategy - Add default implementation (restart all tasks) - Add logic to load the failover strategy from the configuration commit c04a8a312098fddce14e392b8d9dbf396b1df3f3 Author: Stephan Ewen Date: 2017-03-29T20:49:54Z [FLINK-6340] [flip-1] Add a termination future to the Execution commit 92d3f7e1025dc3c3499730bda8e8a9acfd3b5c13 Author: shuai.xus Date: 2017-04-18T06:15:29Z [FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the ExecutionGraph on task failure commit 456600d5e37724bbcc7d570f6828e3fef6298483 Author: shuai.xus Date: 2017-04-20T21:56:53Z [FLINK-5867] [flip-1] Add tests for pipelined failover region construction commit 622f07e0efc82bf13f12ae1960a35ecc48c865c1 Author: Stephan Ewen Date: 2017-04-20T22:02:19Z [FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region construction This method exploits the fact that verties are already in topological order. commit 39402583df8b4c51016c72f968772cbbdd6c92e3 Author: shuai.xus Date: 2017-04-25T07:42:48Z [FLINK-5867] [flip-1] Correct some JavaDocs for RestartIndividualStrategy > The implementation of RestartPipelinedRegionStrategy > > > Key: FLINK-5867 > URL: https://issues.apache.org/jira/browse/FLINK-5867 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: shuai.xu >Assignee: shuai.xu > > The RestartPipelinedRegionStrategy's responsibility is the following: > 1. Calculate all FailoverRegions and their relations when initializing. > 2. Listen for the failure of the job and executions, and find corresponding > FailoverRegions to do the failover. -- This message was sent by Atlassian JIRA (v6.3.15#6346)