[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147444#comment-15147444
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1470


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147442#comment-15147442
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-184239679
  
Merging this.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144405#comment-15144405
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-183271832
  
Thank you very much! Behaviour is as I would expect it.

Last minor comment: can you add a comment in the deprecated methods (both 
Java and Scala) with a reference to the restart strategy while merging? I think 
this will make the transition easier for users.

+1 to merge.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144406#comment-15144406
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-183272439
  
Sure, will do. Thanks for the review :-)


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144402#comment-15144402
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-183268932
  
Alright, I've reintroduced the old execution attempts and delay 
configuration values and the API calls at the `ExecutionEnvironment`.

The behaviour is now the following: 

1. If an explicit `RestartStrategy` is set for a job, it is taken. 

2. Otherwise it is checked whether the number of retries and retry delay 
has been set at the `ExecutionEnvironment`/`ExecutionConfig`. If this is the 
case, then a `FixedDelayRestartStrategy` is instantiated with these values.  

3. If no explicit `RestartStrategy` has been defined for the job, then the 
default restart strategy of the `JobManager` is used. The default restart 
strategy is defined the following way:

3.1. If the configuration contains a configuration value 
`restart-strategy`, then this defines the used `RestartStrategy`.

3.2. If `restart-strategy` is not set, then the old 
`execution-retries.default` and `execution-retries.delay` configuration values 
are checked. If they are set with `execution-retries.default > 0` and 
`execution-retries.delay >= 0`, then a `FixedDelayRestartStrategy` is 
instantiated with the respective values. This is then used as the default 
restart strategy. If these values are not defined, then a `NoRestartStrategy` 
is instantiated.

This should be not API breaking unless people used the 
`setExecutionRetries` at the `JobGraph` or the `Plan`.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15143386#comment-15143386
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-183037438
  
Thanks for addressing the comments. Changes look good!

Regarding the API breaking config key changes: is it feasible to just 
deprecate the old configuration keys instead of removing them? If set, we can 
just translate it to the matching restart strategy.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15139297#comment-15139297
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-181981959
  
If nobody objects, then I would like to merge this PR, since it will give 
us more flexibility in the future with respect to restarting strategies.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15130207#comment-15130207
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-179156104
  
I addressed Ufuk's comments and rebased on the latest master + PR #1577. If 
we are ok with the changes, which are API breaking because they change how the 
restart behaviour is defined, then I would  like to merge this PR.

Any objections?


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128092#comment-15128092
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1470#discussion_r51556295
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -237,53 +236,26 @@ public ExecutionConfig setParallelism(int 
parallelism) {
}
 
/**
-* Gets the number of times the system will try to re-execute failed 
tasks. A value
-* of {@code -1} indicates that the system default value (as defined in 
the configuration)
-* should be used.
+* Sets the restart strategy configuration which defines which restart 
strategy shall be used
+* for the execution graph of the corresponding job.
--- End diff --

Agreed. Good point. I've simplified the the description and added a code 
example.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128093#comment-15128093
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1470#discussion_r51556358
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.Callable;
+
+import static akka.dispatch.Futures.future;
+
+/**
+ * Restart strategy which tries to restart the given {@link 
ExecutionGraph} a fixed number of times
+ * with a fixed time delay in between.
+ */
+public class FixedDelayRestartStrategy implements RestartStrategy {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FixedDelayRestartStrategy.class);
+
+
+   private final int maxNumberRestartAttempts;
+   private final long delayBetweenRestartAttempts;
+   private int currentRestartAttempt;
+
+   public FixedDelayRestartStrategy(
+   int maxNumberRestartAttempts,
+   long delayBetweenRestartAttempts) {
+
+   Preconditions.checkArgument(maxNumberRestartAttempts >= 0, 
"Maximum number of restart attempts must be positive.");
+   Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, 
"Delay between restart attempts must be positive");
+
+   this.maxNumberRestartAttempts = maxNumberRestartAttempts;
+   this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
+   currentRestartAttempt = 0;
+   }
+
+   @Override
+   public boolean canRestart() {
+   return currentRestartAttempt < maxNumberRestartAttempts;
+   }
+
+   @Override
+   public void restart(final ExecutionGraph executionGraph) {
+   currentRestartAttempt++;
+
+   future(new Callable() {
+   @Override
+   public Object call() throws Exception {
+   try {
+   LOG.info("Delaying retry of job 
execution for {} ms ...", delayBetweenRestartAttempts);
+   // do the delay
+   
Thread.sleep(delayBetweenRestartAttempts);
+   }
+   catch(InterruptedException e){
--- End diff --

good catch


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed 

[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128108#comment-15128108
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1470#discussion_r51557088
  
--- Diff: docs/apis/fault_tolerance.md ---
@@ -193,73 +193,169 @@ state updates) of Flink coupled with bundled sinks:
 
 [Back to top](#top)
 
+Restart Strategies
+--
 
-Batch Processing Fault Tolerance (DataSet API)
---
+Flink supports different restart strategies which control how the jobs are 
restarted in case of a failure.
+The cluster can be started with a default restart strategy which is always 
used when no job specific restart strategy has been defined.
+In case that the job is submitted with a restart strategy, this strategy 
overrides the cluster's default setting.
--- End diff --

Yes, definitely


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15128109#comment-15128109
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1470#discussion_r51557132
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.restartstrategy;
+
+import java.io.Serializable;
+
+/**
+ * This class defines methods to generate RestartStrategyConfigurations. 
These configurations are
+ * used to create RestartStrategies at runtime.
+ *
+ * The RestartStrategyConfigurations are used to decouple the core module 
from the runtime module.
+ */
+public class RestartStrategies {
+
+   /**
+* Generates NoRestartStrategyConfiguration
+*
+* @return NoRestartStrategyConfiguration
+*/
+   public static RestartStrategyConfiguration noRestart() {
+   return new NoRestartStrategyConfiguration();
+   }
+
+   /**
+* Generates a FixedDelayRestartStrategyConfiguration.
+*
+* @param restartAttempts Number of restart attempts for the 
FixedDelayRestartStrategy
+* @param delayBetweenAttempts Delay in-between restart attempts for 
the FixedDelayRestartStrategy
+* @return FixedDelayRestartStrategy
+*/
+   public static RestartStrategyConfiguration fixedDelayRestart(
+   int restartAttempts,
+   long delayBetweenAttempts) {
+
+   return new 
FixedDelayRestartStrategyConfiguration(restartAttempts, delayBetweenAttempts);
+   }
+
+   public abstract static class RestartStrategyConfiguration implements 
Serializable {
+   private static final long serialVersionUID = 
6285853591578313960L;
+
+   private RestartStrategyConfiguration() {}
+
+   /**
+* Returns a description which is shown in the web interface
+* @return
--- End diff --

Added them


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114954#comment-15114954
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-174450252
  
Whats the status of this PR?


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115011#comment-15115011
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-174463868
  
Haven't received a +1 for merging yet.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115040#comment-15115040
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1470#discussion_r50679685
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
@@ -237,53 +236,26 @@ public ExecutionConfig setParallelism(int 
parallelism) {
}
 
/**
-* Gets the number of times the system will try to re-execute failed 
tasks. A value
-* of {@code -1} indicates that the system default value (as defined in 
the configuration)
-* should be used.
+* Sets the restart strategy configuration which defines which restart 
strategy shall be used
+* for the execution graph of the corresponding job.
--- End diff --

I would add a `` example showing the `RestartStrategies`, 
which will be the common way to configure it I guess.

The text could maybe also be simplified at the end by removing execution 
graph and corresponding job. The average user will not know what it is. On the 
other hand, it might be a good pointer for someone who wants to work on it.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115052#comment-15115052
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1470#discussion_r50681234
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import java.util.concurrent.Callable;
+
+import static akka.dispatch.Futures.future;
+
+/**
+ * Restart strategy which tries to restart the given {@link 
ExecutionGraph} a fixed number of times
+ * with a fixed time delay in between.
+ */
+public class FixedDelayRestartStrategy implements RestartStrategy {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FixedDelayRestartStrategy.class);
+
+
+   private final int maxNumberRestartAttempts;
+   private final long delayBetweenRestartAttempts;
+   private int currentRestartAttempt;
+
+   public FixedDelayRestartStrategy(
+   int maxNumberRestartAttempts,
+   long delayBetweenRestartAttempts) {
+
+   Preconditions.checkArgument(maxNumberRestartAttempts >= 0, 
"Maximum number of restart attempts must be positive.");
+   Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, 
"Delay between restart attempts must be positive");
+
+   this.maxNumberRestartAttempts = maxNumberRestartAttempts;
+   this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
+   currentRestartAttempt = 0;
+   }
+
+   @Override
+   public boolean canRestart() {
+   return currentRestartAttempt < maxNumberRestartAttempts;
+   }
+
+   @Override
+   public void restart(final ExecutionGraph executionGraph) {
+   currentRestartAttempt++;
+
+   future(new Callable() {
+   @Override
+   public Object call() throws Exception {
+   try {
+   LOG.info("Delaying retry of job 
execution for {} ms ...", delayBetweenRestartAttempts);
+   // do the delay
+   
Thread.sleep(delayBetweenRestartAttempts);
+   }
+   catch(InterruptedException e){
--- End diff --

whitespace after catch


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed 

[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115059#comment-15115059
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-174477502
  
Changes look very good. I've tried it out with the two provided 
`RestartStrategies` and it works like a charm. ;) (I didn't try to write a 
custom strategy, but that is OK I guess. The code looks straight forward there.)

I had some trivial inline comments, which should not block merging this if 
you don't agree. +1 to merge.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115054#comment-15115054
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1470#discussion_r50681471
  
--- Diff: docs/apis/fault_tolerance.md ---
@@ -193,73 +193,169 @@ state updates) of Flink coupled with bundled sinks:
 
 [Back to top](#top)
 
+Restart Strategies
+--
 
-Batch Processing Fault Tolerance (DataSet API)
---
+Flink supports different restart strategies which control how the jobs are 
restarted in case of a failure.
+The cluster can be started with a default restart strategy which is always 
used when no job specific restart strategy has been defined.
+In case that the job is submitted with a restart strategy, this strategy 
overrides the cluster's default setting.
--- End diff --

Should we add a sentence that the current cluster default is `no restart`?


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3187) Decouple restart strategy from ExecutionGraph

2016-01-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15087394#comment-15087394
 ] 

ASF GitHub Bot commented on FLINK-3187:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1470#issuecomment-169675092
  
Thanks for the review @rmetzger.

I think this is not a problem, because the user cannot define restart 
strategies. In order to set a `RestartStrategy`, the user has to provide a 
`RestartStrategyConfiguration`. The `RestartStrategyConfiguration` cannot be 
extended outside the `RestartStrategies` class so that the user cannot define 
his own `RestartStrategyConfiguration`. Additionally, the strategy itself will 
only be instantiated from this configuration on the `JobManager` via the 
`RestartStrategyFactory`. This is also code which cannot be changed by the user 
via the API.


> Decouple restart strategy from ExecutionGraph
> -
>
> Key: FLINK-3187
> URL: https://issues.apache.org/jira/browse/FLINK-3187
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Currently, the {{ExecutionGraph}} supports the following restart logic: 
> Whenever a failure occurs and the number of restart attempts aren't depleted, 
> wait for a fixed amount of time and then try to restart. This behaviour can 
> be controlled by the configuration parameters {{execution-retries.default}} 
> and {{execution-retries.delay}}.
> I propose to decouple the restart logic from the {{ExecutionGraph}} a bit by 
> introducing a strategy pattern. That way it would not only allow us to define 
> a job specific restart behaviour but also to implement different restart 
> strategies. Conceivable strategies could be: Fixed timeout restart, 
> exponential backoff restart, partial topology restarts, etc.
> This change is a preliminary step towards having a restart strategy which 
> will scale the parallelism of a job down in case that not enough slots are 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)