[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy

2017-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999589#comment-15999589
 ] 

ASF GitHub Bot commented on FLINK-5867:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3773


> The implementation of RestartPipelinedRegionStrategy
> 
>
> Key: FLINK-5867
> URL: https://issues.apache.org/jira/browse/FLINK-5867
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> The RestartPipelinedRegionStrategy's responsibility is the following:
> 1. Calculate all FailoverRegions and their relations when initializing.
> 2. Listen for the failure of the job and executions, and find corresponding 
> FailoverRegions to do the failover.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy

2017-05-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999455#comment-15999455
 ] 

ASF GitHub Bot commented on FLINK-5867:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3773
  
Thanks you @shuai-xu for checking this. Merging this for 1.3, together with 
the updates proposed by you.

I think we need to initially make a small change that non-pipelined 
exchanges are NOT boundaries of failover regions, until we have the enhancement 
in that triggers upstream region restart when a result could not be fetched.


> The implementation of RestartPipelinedRegionStrategy
> 
>
> Key: FLINK-5867
> URL: https://issues.apache.org/jira/browse/FLINK-5867
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> The RestartPipelinedRegionStrategy's responsibility is the following:
> 1. Calculate all FailoverRegions and their relations when initializing.
> 2. Listen for the failure of the job and executions, and find corresponding 
> FailoverRegions to do the failover.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984131#comment-15984131
 ] 

ASF GitHub Bot commented on FLINK-5867:
---

Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/3773#discussion_r113361061
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 ---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * FailoverRegion manages the failover of a minimal pipeline connected sub 
graph.
+ * It will change from CREATED to CANCELING and then to CANCELLED and at 
last to RUNNING,
+ */
+public class FailoverRegion {
+
+   private static final AtomicReferenceFieldUpdater STATE_UPDATER =
+   
AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, 
"state");
+
+   /** The log object used for debugging. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(FailoverRegion.class);
+
+   // 

+
+   /** a unique id for debugging */
+   private final AbstractID id = new AbstractID();
+
+   private final ExecutionGraph executionGraph;
+
+   private final List connectedExecutionVertexes;
+
+   /** The executor that executes the recovery action after all vertices 
are in a */
+   private final Executor executor;
+
+   /** Current status of the job execution */
+   private volatile JobStatus state = JobStatus.RUNNING;
+
+
+   public FailoverRegion(ExecutionGraph executionGraph, Executor executor, 
List connectedExecutions) {
+   this.executionGraph = checkNotNull(executionGraph);
+   this.executor = checkNotNull(executor);
+   this.connectedExecutionVertexes = 
checkNotNull(connectedExecutions);
+
+   LOG.debug("Created failover region {} with vertices: {}", id, 
connectedExecutions);
+   }
+
+   public void onExecutionFail(Execution taskExecution, Throwable cause) {
+   // TODO: check if need to failover the preceding region
+   if (!executionGraph.getRestartStrategy().canRestart()) {
+   // delegate the failure to a global fail that will 
check the restart strategy and not restart
+   executionGraph.failGlobal(cause);
+   }
+   else {
+   cancel(taskExecution.getGlobalModVersion());
+   }
+   }
+
+   private void allVerticesInTerminalState(long 
globalModVersionOfFailover) {
+   while (true) {
+   JobStatus curStatus = this.state;
+   if (curStatus.equals(JobStatus.CANCELLING)) {
+ 

[jira] [Commented] (FLINK-5867) The implementation of RestartPipelinedRegionStrategy

2017-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983350#comment-15983350
 ] 

ASF GitHub Bot commented on FLINK-5867:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/3773

[FLINK-5867] [FLINK-5866] [flip-1] Implement FailoverStrategy for pipelined 
regions

This is based on #3772 , the relevant commits are the latter four.

The majority of the work has been done by @tiemsn , with some rebasing and 
additions from me.

# Pipelined Region Failover

As described in 
[FLIP-1](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures),
 this pull request implements the detection of pipelined regions in the 
`ExecutionGraph` and failover within these pipelined regions.


![st0-nzqia5abpwrgaogpllw](https://cloud.githubusercontent.com/assets/1727146/25399938/54fda5a4-29f1-11e7-9efe-5d845644089f.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
flip-1-pipelined-regions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3773.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3773


commit ef7fd9964c1c74feb4641e57a138c54558b2449c
Author: Stephan Ewen 
Date:   2017-03-21T18:13:34Z

[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to 
ExecutionGraph

  - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to 
differentiate from fine grained failures/recovery
  - Add base class for FailoverStrategy
  - Add default implementation (restart all tasks)
  - Add logic to load the failover strategy from the configuration

commit c04a8a312098fddce14e392b8d9dbf396b1df3f3
Author: Stephan Ewen 
Date:   2017-03-29T20:49:54Z

[FLINK-6340] [flip-1] Add a termination future to the Execution

commit 92d3f7e1025dc3c3499730bda8e8a9acfd3b5c13
Author: shuai.xus 
Date:   2017-04-18T06:15:29Z

[FLINK-5867] [flip-1] Support restarting only pipelined sub-regions of the 
ExecutionGraph on task failure

commit 456600d5e37724bbcc7d570f6828e3fef6298483
Author: shuai.xus 
Date:   2017-04-20T21:56:53Z

[FLINK-5867] [flip-1] Add tests for pipelined failover region construction

commit 622f07e0efc82bf13f12ae1960a35ecc48c865c1
Author: Stephan Ewen 
Date:   2017-04-20T22:02:19Z

[FLINK-5867] [flip-1] Improve performance of Pipelined Failover Region 
construction

This method exploits the fact that verties are already in topological order.

commit 39402583df8b4c51016c72f968772cbbdd6c92e3
Author: shuai.xus 
Date:   2017-04-25T07:42:48Z

[FLINK-5867] [flip-1] Correct some JavaDocs for RestartIndividualStrategy




> The implementation of RestartPipelinedRegionStrategy
> 
>
> Key: FLINK-5867
> URL: https://issues.apache.org/jira/browse/FLINK-5867
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> The RestartPipelinedRegionStrategy's responsibility is the following:
> 1. Calculate all FailoverRegions and their relations when initializing.
> 2. Listen for the failure of the job and executions, and find corresponding 
> FailoverRegions to do the failover.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)