[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2019-01-28 Thread Robert Metzger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754085#comment-16754085
 ] 

Robert Metzger commented on FLINK-9143:
---

 I assume we can close this pull request as it has been fixed by now: 
[https://github.com/apache/flink/pull/5846] ?

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544056#comment-16544056
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6283


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542993#comment-16542993
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6283
  
Alright, I think I fetched the last commit as well. Once Travis gives green 
light, I'll merge it.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542988#comment-16542988
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6283
  
Thanks @tillrohrmann for review. I've pushed one more commit that fixes 
test failure. It adds proper comparison of `RestartStrategies`, otherwise 
`org.apache.flink.api.common.ExecutionConfigTest#testExecutionConfigSerialization`
 fails.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542979#comment-16542979
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202333518
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -567,22 +560,16 @@ private void configureCheckpointing() {
 
long interval = cfg.getCheckpointInterval();
if (interval > 0) {
-
ExecutionConfig executionConfig = 
streamGraph.getExecutionConfig();
// propagate the expected behaviour for checkpoint 
errors to task.

executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
-
-   // check if a restart strategy has been set, if not 
then set the FixedDelayRestartStrategy
-   if (executionConfig.getRestartStrategy() == null) {
-   // if the user enabled checkpointing, the 
default number of exec retries is infinite.
-   executionConfig.setRestartStrategy(
-   
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
-   }
} else {
// interval of max value means disable periodic 
checkpoint
interval = Long.MAX_VALUE;
}
 
+
+
--- End diff --

Remove two line breaks


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542921#comment-16542921
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202320783
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ---
@@ -357,6 +362,42 @@ public void testRestoringFromSavepoint() throws 
Exception {
}
}
 
+   /**
+* Tests that in a streaming use case where checkpointing is enabled, a
+* fixed delay with Integer.MAX_VALUE retries is instantiated if no 
other restart
+* strategy has been specified.
+*/
+   @Test
+   public void testAutomaticRestartingWhenCheckpointing() throws Exception 
{
+   // create savepoint data
+   final long savepointId = 42L;
+   final File savepointFile = createSavepoint(savepointId);
+
+   // set savepoint settings
+   final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath(
+   savepointFile.getAbsolutePath(),
+   true);
+   final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+   final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+   final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
+   completedCheckpointStore,
+   new StandaloneCheckpointIDCounter());
+   
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build());
--- End diff --

This was the problem with wrongly handling default value in 
`RestartStrategyFactory`. Fixed now.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542899#comment-16542899
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202319384
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -339,12 +339,28 @@ public void 
setSnapshotSettings(JobCheckpointingSettings settings) {
 * Gets the settings for asynchronous snapshots. This method returns 
null, when
 * checkpointing is not enabled.
 *
-* @return The snapshot settings, or null, if checkpointing is not 
enabled.
+* @return The snapshot settings
 */
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
 
+   /**
+* Checks if the checkpointing was enabled for this job graph
+*
+* @return true if checkpointing enabled
+*/
+   public boolean isCheckpointingEnabled() {
+
+   if (snapshotSettings == null) {
+   return false;
+   }
+
+   long checkpointInterval = 
snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
+   return checkpointInterval > 0 &&
+   checkpointInterval < Long.MAX_VALUE;
--- End diff --

I don't think it is true (about the checkpoint enabling). I thought the 
same based on some javadocs, but it turned out that `snapshotSetting` is always 
set in 
`org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator#configureCheckpointing`.
That's why I added this method.

The problem with the second method is that the `CheckpointCoordinator` is 
created while constructing `ExecutionGraph` which requires the restartstrategy. 
I thought adding this method was the least invasive one.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542859#comment-16542859
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202311125
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 ---
@@ -149,7 +149,7 @@ public static RestartStrategyFactory 
createRestartStrategyFactory(Configuration
}
 
// fallback in case of an error
-   return 
NoRestartStrategy.createFactory(configuration);
+   return 
NoOrFixedIfCheckpointingEnabledRestartStrategy.createFactory(configuration);
--- End diff --

Don't know why, but assumed the `default` branch is reached in case nothing 
was set in config. My mistake.

I've fixed it to differentiate the situation when `"none"` was set (this 
value is used across documentation, I think it should translate directly to 
`NoRestart`) and when the config was not set at all.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542820#comment-16542820
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202297182
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategy.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+/**
+ * Default restart strategy that resolves either to {@link 
NoRestartStrategy} or {@link FixedDelayRestartStrategy}
+ * depending if checkpointing was enabled.
+ */
+public class NoOrFixedIfCheckpointingEnabledRestartStrategy implements 
RestartStrategy {
+
+   private static final long DEFAULT_RESTART_DELAY = 0;
+
+   private final RestartStrategy resolvedStrategy;
+
+   /**
+* Creates a NoOrFixedIfCheckpointingEnabledRestartStrategyFactory 
instance.
+*
+* @param configuration Configuration object which is ignored
+* @return NoOrFixedIfCheckpointingEnabledRestartStrategyFactory 
instance
+*/
+   public static NoOrFixedIfCheckpointingEnabledRestartStrategyFactory 
createFactory(Configuration configuration) {
+   return new 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
+   }
+
+   /**
+* Creates instance of NoOrFixedIfCheckpointingEnabledRestartStrategy
+*
+* @param isCheckpointingEnabled if true resolves to {@link 
FixedDelayRestartStrategy}
+* otherwise to {@link NoRestartStrategy}
+*/
+   public NoOrFixedIfCheckpointingEnabledRestartStrategy(boolean 
isCheckpointingEnabled) {
+   if (isCheckpointingEnabled) {
+   resolvedStrategy = new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY);
+   } else {
+   resolvedStrategy = new NoRestartStrategy();
+   }
+   }
+
+   @Override
+   public boolean canRestart() {
+   return resolvedStrategy.canRestart();
+   }
+
+   @Override
+   public void restart(RestartCallback restarter, ScheduledExecutor 
executor) {
+   resolvedStrategy.restart(restarter, executor);
+   }
+
+   public static class 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends 
RestartStrategyFactory {
--- End diff --

Wouldn't it be enough to only have this restart strategy factory without 
the corresponding `RestartStrategy`? We could instantiate the respective 
strategies in the `createRestartStrategy(boolean isCheckpointingEnabled)` 
method.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> 

[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542822#comment-16542822
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202304777
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -339,12 +339,28 @@ public void 
setSnapshotSettings(JobCheckpointingSettings settings) {
 * Gets the settings for asynchronous snapshots. This method returns 
null, when
 * checkpointing is not enabled.
 *
-* @return The snapshot settings, or null, if checkpointing is not 
enabled.
+* @return The snapshot settings
 */
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
 
+   /**
+* Checks if the checkpointing was enabled for this job graph
+*
+* @return true if checkpointing enabled
+*/
+   public boolean isCheckpointingEnabled() {
+
+   if (snapshotSettings == null) {
+   return false;
+   }
+
+   long checkpointInterval = 
snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
+   return checkpointInterval > 0 &&
+   checkpointInterval < Long.MAX_VALUE;
--- End diff --

I think technically, we enable checkpointing, meaning creating a 
`CheckpointCoordinator`, always iff `snapshotSettings != null`. We could also 
say that we check the `CheckpointCoordinator.isPeriodicCheckpointingConfigured` 
in order to decide whether checkpointing is enabled. Then we would not need to 
introduce this method which could go out of sync with how we define whether 
checkpointing is enabled or not. What do you think?


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542821#comment-16542821
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202297327
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 ---
@@ -149,7 +149,7 @@ public static RestartStrategyFactory 
createRestartStrategyFactory(Configuration
}
 
// fallback in case of an error
-   return 
NoRestartStrategy.createFactory(configuration);
+   return 
NoOrFixedIfCheckpointingEnabledRestartStrategy.createFactory(configuration);
--- End diff --

I think we should also create this factory if the `restart-strategy` 
configuration value is `"non"` which is the default value.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542819#comment-16542819
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202298725
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ---
@@ -357,6 +362,42 @@ public void testRestoringFromSavepoint() throws 
Exception {
}
}
 
+   /**
+* Tests that in a streaming use case where checkpointing is enabled, a
+* fixed delay with Integer.MAX_VALUE retries is instantiated if no 
other restart
+* strategy has been specified.
+*/
+   @Test
+   public void testAutomaticRestartingWhenCheckpointing() throws Exception 
{
+   // create savepoint data
+   final long savepointId = 42L;
+   final File savepointFile = createSavepoint(savepointId);
+
+   // set savepoint settings
+   final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath(
+   savepointFile.getAbsolutePath(),
+   true);
+   final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+   final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+   final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
+   completedCheckpointStore,
+   new StandaloneCheckpointIDCounter());
+   
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build());
--- End diff --

Changing this line into
```
new TestingJobManagerSharedServicesBuilder()

.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
.build()
```
Will make the test fail.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541926#comment-16541926
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202103545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

You're right. I'm just wondering whether you ever want to enable 
checkpointing without a restart strategy. So to speak if you set 
`FallbackRestartStrategy`, enable checkpointing and set `NoRestartStrategy` as 
the server side `RestartStrategy`, then do you want `FixedRestartStrategy` or 
`NoRestartStrategy`?

On the other hand you might want to disable restarting for all jobs running 
on your cluster by setting the restart strategy to `NoRestartStrategy`.

Maybe the proper solution would be to set `ExecutionConfig#restartStrategy` 
to `FallbackRestartStrategy` and introduce a new default server side restart 
strategy `NoOrFixedIfCheckpointingEnabled` which resolved to 
`FixedRestartStrategy` if checkpointing is enabled and if not it resolves to 
`NoRestartStrategy`.

What do you think?


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541899#comment-16541899
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202098798
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

Right now null is a bit different than `FallbackRestartStrategy`. 
* null - allows fallback to `FixedRestartStrategy` in case of checkpointing 
enabled and `noRestart` was set on server-side
* `FallbackRestartStrategy` - always the server-side strategy is used 
(indifferent to checkpointing)

If we by default set the `FallbackStrategy` we have two options:
 * we either always set `FixedRestartStrategy` if checkpointing is enabled 
and `noRestart` was set on server side
* we never automatically fallback to `FixedRestartStrategy`, even in case 
of checkpointing.

What do you think would be better option? Keep the null, always fallback to 
`FixedRestartStrategy` or never fallback to it?


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541797#comment-16541797
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202012704
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+   private static final long DEFAULT_RESTART_DELAY = 0;
+
+   /**
+* Resolves which {@link RestartStrategy} to use. It should be used 
only on the server side.
+* The resolving strategy is as follows:
+* 
+* Strategy set within job graph.
+* Strategy set flink-conf.yaml on the server set, unless is set to 
{@link NoRestartStrategy} and checkpointing is enabled.
+* If no strategy was set on client and server side and 
checkpointing was enabled then {@link FixedDelayRestartStrategy} is used
+* 
+*
+* @param clientConfigurationrestart configuration given within the 
job graph
+* @param serverStrategyFactory  default server side strategy factory
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
--- End diff --

Please don't align the java doc strings. The problem is whenever someone 
changes the names of the parameters, he will be tempted to also correct the 
then wrong indentation which is unnecessary work. 


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in 

[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541795#comment-16541795
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202013103
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+   private static final long DEFAULT_RESTART_DELAY = 0;
+
+   /**
+* Resolves which {@link RestartStrategy} to use. It should be used 
only on the server side.
+* The resolving strategy is as follows:
+* 
+* Strategy set within job graph.
+* Strategy set flink-conf.yaml on the server set, unless is set to 
{@link NoRestartStrategy} and checkpointing is enabled.
+* If no strategy was set on client and server side and 
checkpointing was enabled then {@link FixedDelayRestartStrategy} is used
+* 
+*
+* @param clientConfigurationrestart configuration given within the 
job graph
+* @param serverStrategyFactory  default server side strategy factory
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return resolved strategy
+*/
+   public static RestartStrategy resolve(
+   @Nullable 
RestartStrategies.RestartStrategyConfiguration clientConfiguration,
+   RestartStrategyFactory serverStrategyFactory,
+   boolean isCheckpointingEnabled) {
+
+   final RestartStrategy serverSideRestartStrategy = 
serverStrategyFactory.createRestartStrategy();
+
+   RestartStrategy clientSideRestartStrategy = null;
--- End diff --

could be `final`


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>  

[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541799#comment-16541799
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202065356
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link RestartStrategyResolving}.
+ */
+public class RestartStrategyResolvingTest {
+
+   @Test
+   public void testClientSideHighestPriority() {
+
+   RestartStrategy resolvedStrategy = 
RestartStrategyResolving.resolve(noRestart(),
+   new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L),
+   true);
+
+   assertTrue(resolvedStrategy instanceof NoRestartStrategy);
--- End diff --

For the future I would suggest to use Hamcrest matchers, because they give 
better failure messages and are more expressive.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541800#comment-16541800
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202071547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+import javax.annotation.Nullable;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+   private static final long DEFAULT_RESTART_DELAY = 0;
+
+   /**
+* Resolves which {@link RestartStrategy} to use. It should be used 
only on the server side.
+* The resolving strategy is as follows:
+* 
+* Strategy set within job graph.
+* Strategy set flink-conf.yaml on the server set, unless is set to 
{@link NoRestartStrategy} and checkpointing is enabled.
+* If no strategy was set on client and server side and 
checkpointing was enabled then {@link FixedDelayRestartStrategy} is used
+* 
+*
+* @param clientConfigurationrestart configuration given within the 
job graph
+* @param serverStrategyFactory  default server side strategy factory
+* @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+* @return resolved strategy
+*/
+   public static RestartStrategy resolve(
+   @Nullable 
RestartStrategies.RestartStrategyConfiguration clientConfiguration,
--- End diff --

By setting the default restart strategy to 
`FallbackRestartStrategyConfiguration` in the `ExecutionConfig` we could remove 
the `@Nullable` annotation here and simplify the code by avoiding the null 
checks.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, 

[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541801#comment-16541801
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202070710
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 ---
@@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() {

CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   
executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart());
--- End diff --

Should we maybe set the `FallbackRestartStrategyConfiguration` per default 
in the `ExecutionConfig`? That way, we could also simplify the resolve code.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541796#comment-16541796
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202065050
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link RestartStrategyResolving}.
+ */
+public class RestartStrategyResolvingTest {
--- End diff --

Test classes should extend from `TestLogger` to give better test logging 
output separation.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541798#comment-16541798
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6283#discussion_r202070020
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
 ---
@@ -33,12 +33,11 @@
 public class RestartStrategyTest extends TestLogger {
 
/**
-* Tests that in a streaming use case where checkpointing is enabled, a
-* fixed delay with Integer.MAX_VALUE retries is instantiated if no 
other restart
-* strategy has been specified.
+* Tests that in a streaming use case where checkpointing is enabled, 
there is no default strategy set on the
+* client side.
 */
@Test
-   public void testAutomaticRestartingWhenCheckpointing() throws Exception 
{
+   public void testNoDefaultStrategyOnClientSideWhenCheckpointing() throws 
Exception {
--- End diff --

Maybe `testNoDefaultStrategyOnClientSideWhenCheckpointingEnabled`


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-07-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536604#comment-16536604
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6283

[FLINK-9143] Use cluster strategy if none was set on client side

## What is the purpose of the change

The goal of this PR is to enable configuring default restart strategy from 
the server side's config.


## Brief change log

  * no strategy is set on the client side if none explicitly specified
  * on server side the strategy is resolved based on: client configuration, 
server side configuration, fallback to `FixedDelayStrategy` if none set on 
client side and `NoRestartStrategy` set on server side in case of checkpointing 
enabled

## Verifying this change

This change added tests and can be verified as follows:
  - RestartStrategyResolvingTest.java
  - tests using cluster pass


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink FLINK-9143

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6283.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6283


commit 8efded9c1e1a555edd7733b35d9f1f49f8cc7304
Author: Dawid Wysakowicz 
Date:   2018-07-05T11:48:23Z

[FLINK-9143] Use cluster strategy if none was set on client side




> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
>  Labels: pull-request-available
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-06-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516530#comment-16516530
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user yuqi1129 commented on the issue:

https://github.com/apache/flink/pull/5846
  
@dawidwys 
Sorry , I will push my code in two days,  if there still have any problem, 
any suggestions are appreciated. 
any way, if you have try to fix this problem, just go head.



> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-06-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513780#comment-16513780
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5846
  
Hi @yuqi1129 , do you still want to work on this?


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492691#comment-16492691
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191211627
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration 
flinkConfig, FlinkPlan optPlan,
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
 
+   //  if we disable checkpoint and do not set restart strategy, 
Restart strategy will be set as in flink-conf.yaml
+   //  in flip6, jobmaster do not set this conf, so we have set 
this conf here.
--- End diff --

You are right, I omit this point, sorry


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492423#comment-16492423
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191150742
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration 
flinkConfig, FlinkPlan optPlan,
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
 
+   //  if we disable checkpoint and do not set restart strategy, 
Restart strategy will be set as in flink-conf.yaml
+   //  in flip6, jobmaster do not set this conf, so we have set 
this conf here.
--- End diff --

I am pretty sure the behaviour is still there. In JobMaster.java, or am I 
wrong?

final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy();

this.restartStrategy = (restartStrategyConfiguration != null) ?

RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :

jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy();


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492399#comment-16492399
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191143764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -255,6 +264,35 @@ public void start() throws Exception {
}
}
 
+   /**
+*
--- End diff --

OK


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492400#comment-16492400
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191143882
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -255,6 +264,35 @@ public void start() throws Exception {
}
}
 
+   /**
+*
+* @param job  job graph
+* @param configuration configuration in flink-conf.yaml
+* @param isClusterPoint whether this is client side or cluster site
+*/
+   public static void setJobgraphRestartStrategy(JobGraph job, 
Configuration configuration, boolean isClusterPoint) {
--- End diff --

en. Indeed this is not a proper place


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492398#comment-16492398
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191143746
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration 
flinkConfig, FlinkPlan optPlan,
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
 
+   //  if we disable checkpoint and do not set restart strategy, 
Restart strategy will be set as in flink-conf.yaml
+   //  in flip6, jobmaster do not set this conf, so we have set 
this conf here.
--- End diff --

before flip6, cluster site will set restart strategy:
```
val restartStrategy =
  Option(jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy())
.map(RestartStrategyFactory.createRestartStrategy)
.filter(p => p != null) match {
case Some(strategy) => strategy
case None => restartStrategyFactory.createRestartStrategy()
  }
```
in Jobmanger.java

in flip6, cluster will directly use jobgraph deserialized from client size


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492389#comment-16492389
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191133623
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -255,6 +264,35 @@ public void start() throws Exception {
}
}
 
+   /**
+*
+* @param job  job graph
+* @param configuration configuration in flink-conf.yaml
+* @param isClusterPoint whether this is client side or cluster site
+*/
+   public static void setJobgraphRestartStrategy(JobGraph job, 
Configuration configuration, boolean isClusterPoint) {
+
+   try {
+   ExecutionConfig config = 
job.getSerializedExecutionConfig().deserializeValue(
+   Dispatcher.class.getClassLoader());
+
+   if (config.getRestartStrategy() == null) {
+   RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
+   
RestartStrategyFactory.createRestartStrategyConfiguration(configuration);
+
+   //FixedDelay will be as default
+   if (restartStrategyConfiguration == null && 
isClusterPoint) {
--- End diff --

I think in case the checkpointing is disabled the default strategy should 
be `noRestart`.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492388#comment-16492388
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191133308
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration 
flinkConfig, FlinkPlan optPlan,
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
 
+   //  if we disable checkpoint and do not set restart strategy, 
Restart strategy will be set as in flink-conf.yaml
+   //  in flip6, jobmaster do not set this conf, so we have set 
this conf here.
--- End diff --

Don't get this comment? Why do you say the jobmaster does not set it? 


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492390#comment-16492390
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191140799
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -250,6 +259,35 @@ public void start() throws Exception {
}
}
 
+   /**
+*
+* @param job  job graph
+* @param configuration configuration in flink-conf.yaml
+* @param isClusterPoint whether this is client side or cluster site
+*/
+   public static void setJobgraphRestartStrategy(JobGraph job, 
Configuration configuration, boolean isClusterPoint) {
+
+   try {
+   ExecutionConfig config = 
job.getSerializedExecutionConfig().deserializeValue(
+   Dispatcher.class.getClassLoader());
--- End diff --

You should use userclassloader here  (especially in case of cluster side)


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492387#comment-16492387
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191133512
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -255,6 +264,35 @@ public void start() throws Exception {
}
}
 
+   /**
+*
+* @param job  job graph
+* @param configuration configuration in flink-conf.yaml
+* @param isClusterPoint whether this is client side or cluster site
+*/
+   public static void setJobgraphRestartStrategy(JobGraph job, 
Configuration configuration, boolean isClusterPoint) {
--- End diff --

Don't think it is the right place. Especially as it is used also on the 
client side. Maybe move to `JobGraph`?


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492386#comment-16492386
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5846#discussion_r191133405
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -255,6 +264,35 @@ public void start() throws Exception {
}
}
 
+   /**
+*
--- End diff --

Missing javadoc.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492356#comment-16492356
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5846
  
One comment first. Please reply to review comments in github, otherwise it 
is hard to follow the discussion. Comments from github are posted to the jira 
issue automatically.

I still think your changes only affect the situation when checkpointing is 
disabled. If checkpointing is enabled the default strategy is set on the client 
side via: `ClusteClient#getJobGraph > StreamGraph#getJobGraph > 
StreamingJobGraphGenerator#createJobGraph > 
StreamingJobGraphGenerator#configureCheckpointing.


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-26 Thread yuqi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491687#comment-16491687
 ] 

yuqi commented on FLINK-9143:
-

[~dawidwys] Thanks for your reviews.
```
Did I understand this change correctly that it only affects situation when the 
checkpointing is disabled?
```
Not really. What i change is when user do not set restart strategy in the flink 
code explicitly, then we will use strategy set in client configuration,  if not 
set in client configuration, we will use cluster configuration. 

```After your change the cluster configuration will never be taken into account 
(neither with nor without checkpointing).```

Indeed, createRestartStrategyConfiguration will not return null even restart 
strategy was not set in client configuration, i have change the logic. and job 
manager will set this point when submit job as restart strategy is null.

In mode before flip6, job manager have take cluster configuration of 
flink-conf.yaml effect, So, I only need to change job master in flip6.

About your suggestion, i do agree on this point. may be 
1. set in code
2. client configuration
3. cluster configuration
4. default/no default.

if you have time, please help to review the change again.

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487057#comment-16487057
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5846
  
@yuqi1129 Thank you for your contribution. 

Did I understand this change correctly that it only affects situation when 
the checkpointing is disabled? Currently in this case the `RestartStrategy` 
will be set from cluster's `flink-conf.yaml`. After your change the cluster 
configuration will never be taken into account (neither with nor without 
checkpointing). Therefore I would be against merging it in current shape, as it 
does not address the original problem, but just changes only one of many corner 
cases.

What do you think about such strategy for resolving `RestartStrategy`?:

1. job config
  * set in code
  * client configuration
  * (no default, even in case of checkpointing)
2. cluster config
  * cluster configuration (flink-conf)
  * default value = FixedDelayRestart (might be only in case of 
checkpointing)


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-04-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438710#comment-16438710
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

Github user yuqi1129 commented on the issue:

https://github.com/apache/flink/pull/5846
  
@tillrohrmann Can you help to look if this change is OK ? thanks


> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>   
>  restart-strategy: none
>  state.backend: rocksdb
>  state.backend.fs.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-metadata]
>  state.backend.rocksdb.checkpointdir: 
> [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb]
>   
>  2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
>  here's the code:
>  public class FailedJob
>  {
>      static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>      public static void main( String[] args ) throws Exception
>      {
>          final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>          env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>          DataStream stream = 
> env.fromCollection(Arrays.asList("test"));
>          stream.map(new MapFunction(){
>              @Override
>              public String map(String obj)
> {                 throw new NullPointerException("NPE");             }
>  
>          });
>          env.execute("Failed job");
>      }
>  }
>   
>  3. Compile: mvn clean package; submit it to the cluster
>   
>  4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>   
>  5. Go to Job's configuration, see Execution Configuration section
>   
>  *Expected result*: restart strategy as defined in flink-conf.yaml
>   
>  *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart 
> attempts.
>   
>   
>  see attached screenshots and jobmanager log (line 1 and 31)
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-04-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438252#comment-16438252
 ] 

ASF GitHub Bot commented on FLINK-9143:
---

GitHub user yuqi1129 opened a pull request:

https://github.com/apache/flink/pull/5846

[FLINK-9143] [client] Restart strategy defined in flink-conf.yaml is ignored


## What is the purpose of the change

This change is to fix the bug that if restart strategy was not specifically 
set in user code, flink will not use default strategy set in flink-conf.yaml in 
flip6 mode. 
## Brief change log

  - In `ClusterClient.getJobGraph`, if no strategy was set in user code, we 
set strategy strategy just as flink-conf.yaml do 

## Verifying this change

This change added tests and can be verified as follows:

  - Add `ClusterClientTest.testgetJobGraph` 
 -  Add `FailureRateRestartStrategyTest.testFailureRateRestartStrategyConf` 
and `FixedDelayRestartStrategyTest.testFixedRestartStrategyConf`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yuqi1129/flink confproblem

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5846.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5846


commit 7c2a8045a310e2942f4fbec3b4a2696b73fae3af
Author: hzyuqi1 
Date:   2018-04-14T05:45:06Z

[FLINK-9143] [client] Restart strategy defined in flink-conf.yaml is ignored

This closes #9143.

commit df23fbe809c2cbd025a923cd8607ec852d40b4ae
Author: hzyuqi1 
Date:   2018-04-14T05:46:21Z

Merge branch 'master' of https://github.com/apache/flink into confproblem




> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>  
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
>  
> 2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
> here's the code:
> public class FailedJob
> {
>     static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>     public static void main( String[] args ) throws Exception
>     {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         DataStream stream = env.fromCollection(Arrays.asList("test"));
>         stream.map(new MapFunction(){
>             @Override
>             public String map(String obj) {
>                 throw new NullPointerException("NPE");
>             } 
>         });
>         env.execute("Failed job");
>     }
> }
>  
> 3. Compile: mvn clean package; submit it to the cluster
>  
> 4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>  
> 5. Go to Job's configuration, see Execution Configuration section
>  
> *Expected result*: restart strategy as defined in flink-conf.yaml
>  
> *Actual result*: Restart with fixed delay (1 ms). 
> #[2147483647|tel:(214)%20748-3647] restart attempts.
>  
>  
> see attached screenshots and jobmanager log (line 1 and 31)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-04-13 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437604#comment-16437604
 ] 

Till Rohrmann commented on FLINK-9143:
--

The problem is that we don't necessarily have the cluster configuration when we 
submit the job. Consequently, it is a priori not possible to decide whether the 
cluster has a default restart strategy or not.

The underlying problem with the restart strategies is that the it is a cluster 
configuration as well as a job configuration option. I think it should only be 
a cluster configuration option. If we don't want break the behavior, then a 
solution could be to differentiate between an explicitly set job-specific 
restart strategy and the default job-specific restart strategy set when 
submitting a streaming job with checkpointing enabled.

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>  
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
>  
> 2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
> here's the code:
> public class FailedJob
> {
>     static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>     public static void main( String[] args ) throws Exception
>     {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         DataStream stream = env.fromCollection(Arrays.asList("test"));
>         stream.map(new MapFunction(){
>             @Override
>             public String map(String obj) {
>                 throw new NullPointerException("NPE");
>             } 
>         });
>         env.execute("Failed job");
>     }
> }
>  
> 3. Compile: mvn clean package; submit it to the cluster
>  
> 4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>  
> 5. Go to Job's configuration, see Execution Configuration section
>  
> *Expected result*: restart strategy as defined in flink-conf.yaml
>  
> *Actual result*: Restart with fixed delay (1 ms). 
> #[2147483647|tel:(214)%20748-3647] restart attempts.
>  
>  
> see attached screenshots and jobmanager log (line 1 and 31)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-04-07 Thread yuqi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429590#comment-16429590
 ] 

yuqi commented on FLINK-9143:
-

En, Agree with you on this point [~Zentol], I also wonder why flink ignores 
configuration file and set restart strategy to fixed delay if users do not set 
strategy explicitly in code no matter configuration file is. [~StephanEwen] 
could you give us some hint ?

      in jobmanger, flink will sets strategy according to configuraion file if 
users forget to set this option, see jobmanager.scala

 
{code:java}
val restartStrategy =
 Option(jobGraph.getSerializedExecutionConfig()
 .deserializeValue(userCodeLoader)
 .getRestartStrategy())
 .map(RestartStrategyFactory.createRestartStrategy)
 .filter(p => p != null) match {
 case Some(strategy) => strategy
 case None => restartStrategyFactory.createRestartStrategy()
 }{code}
 

 

Back to this issue. restart strategy will be set to fixed delay if no one was 
set by flink client. as show above, at this time restartStrategy on the 
jobmanager will never be `None` and thus cause problem.

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>  
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
>  
> 2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
> here's the code:
> public class FailedJob
> {
>     static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>     public static void main( String[] args ) throws Exception
>     {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         DataStream stream = env.fromCollection(Arrays.asList("test"));
>         stream.map(new MapFunction(){
>             @Override
>             public String map(String obj) {
>                 throw new NullPointerException("NPE");
>             } 
>         });
>         env.execute("Failed job");
>     }
> }
>  
> 3. Compile: mvn clean package; submit it to the cluster
>  
> 4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>  
> 5. Go to Job's configuration, see Execution Configuration section
>  
> *Expected result*: restart strategy as defined in flink-conf.yaml
>  
> *Actual result*: Restart with fixed delay (1 ms). 
> #[2147483647|tel:(214)%20748-3647] restart attempts.
>  
>  
> see attached screenshots and jobmanager log (line 1 and 31)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-04-07 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429285#comment-16429285
 ] 

Chesnay Schepler commented on FLINK-9143:
-

Yes itt is a bug:

{{// check if a restart strategy has been set, if not then set the 
FixedDelayRestartStrategy}}

This check completely ignores the configuration file, which it shouldn't do.

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Assignee: yuqi
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>  
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
>  
> 2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
> here's the code:
> public class FailedJob
> {
>     static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>     public static void main( String[] args ) throws Exception
>     {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         DataStream stream = env.fromCollection(Arrays.asList("test"));
>         stream.map(new MapFunction(){
>             @Override
>             public String map(String obj) {
>                 throw new NullPointerException("NPE");
>             } 
>         });
>         env.execute("Failed job");
>     }
> }
>  
> 3. Compile: mvn clean package; submit it to the cluster
>  
> 4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>  
> 5. Go to Job's configuration, see Execution Configuration section
>  
> *Expected result*: restart strategy as defined in flink-conf.yaml
>  
> *Actual result*: Restart with fixed delay (1 ms). 
> #[2147483647|tel:(214)%20748-3647] restart attempts.
>  
>  
> see attached screenshots and jobmanager log (line 1 and 31)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-04-07 Thread yuqi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429282#comment-16429282
 ] 

yuqi commented on FLINK-9143:
-

As far as i know, if you set 
{code:java}
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);{code}
by default, flink will use `fixedDelayRestart`, see blow:
{code:java}
private void configureCheckpointing() {
 CheckpointConfig cfg = streamGraph.getCheckpointConfig();

 long interval = cfg.getCheckpointInterval();
 if (interval > 0) {
 // check if a restart strategy has been set, if not then set the 
FixedDelayRestartStrategy
 if (streamGraph.getExecutionConfig().getRestartStrategy() == null) {
 // if the user enabled checkpointing, the default number of exec retries is 
infinite.
 streamGraph.getExecutionConfig().setRestartStrategy(
 RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
 }
 } else {
 // interval of max value means disable periodic checkpoint
 interval = Long.MAX_VALUE;
 }{code}
So, this is not a bug, what's your option?[~till.rohrmann]

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>  Components: Configuration
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>  
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
>  
> 2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
> here's the code:
> public class FailedJob
> {
>     static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>     public static void main( String[] args ) throws Exception
>     {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         DataStream stream = env.fromCollection(Arrays.asList("test"));
>         stream.map(new MapFunction(){
>             @Override
>             public String map(String obj) {
>                 throw new NullPointerException("NPE");
>             } 
>         });
>         env.execute("Failed job");
>     }
> }
>  
> 3. Compile: mvn clean package; submit it to the cluster
>  
> 4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>  
> 5. Go to Job's configuration, see Execution Configuration section
>  
> *Expected result*: restart strategy as defined in flink-conf.yaml
>  
> *Actual result*: Restart with fixed delay (1 ms). 
> #[2147483647|tel:(214)%20748-3647] restart attempts.
>  
>  
> see attached screenshots and jobmanager log (line 1 and 31)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored

2018-04-06 Thread Alex Smirnov (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428060#comment-16428060
 ] 

Alex Smirnov commented on FLINK-9143:
-

mailing list discusssion: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Restart-strategy-defined-in-flink-conf-yaml-is-ignored-td19361.html

> Restart strategy defined in flink-conf.yaml is ignored
> --
>
> Key: FLINK-9143
> URL: https://issues.apache.org/jira/browse/FLINK-9143
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.2
>Reporter: Alex Smirnov
>Priority: Major
> Attachments: execution_config.png, jobmanager.log, jobmanager.png
>
>
> Restart strategy defined in flink-conf.yaml is disregarded, when user enables 
> checkpointing.
> Steps to reproduce:
> 1. Download flink distribution (1.4.2), update flink-conf.yaml:
>  
> restart-strategy: none
> state.backend: rocksdb
> state.backend.fs.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-metadata
> state.backend.rocksdb.checkpointdir: 
> file:///tmp/nfsrecovery/flink-checkpoints-rocksdb
>  
> 2. create new java project as described at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html]
> here's the code:
> public class FailedJob
> {
>     static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class);
>     public static void main( String[] args ) throws Exception
>     {
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
>         DataStream stream = env.fromCollection(Arrays.asList("test"));
>         stream.map(new MapFunction(){
>             @Override
>             public String map(String obj) {
>                 throw new NullPointerException("NPE");
>             } 
>         });
>         env.execute("Failed job");
>     }
> }
>  
> 3. Compile: mvn clean package; submit it to the cluster
>  
> 4. Go to Job Manager configuration in WebUI, ensure settings from 
> flink-conf.yaml is there (screenshot attached)
>  
> 5. Go to Job's configuration, see Execution Configuration section
>  
> *Expected result*: restart strategy as defined in flink-conf.yaml
>  
> *Actual result*: Restart with fixed delay (1 ms). 
> #[2147483647|tel:(214)%20748-3647] restart attempts.
>  
>  
> see attached screenshots and jobmanager log (line 1 and 31)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)