[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754085#comment-16754085 ] Robert Metzger commented on FLINK-9143: --- I assume we can close this pull request as it has been fixed by now: [https://github.com/apache/flink/pull/5846] ? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544056#comment-16544056 ] ASF GitHub Bot commented on FLINK-9143: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6283 > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542993#comment-16542993 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6283 Alright, I think I fetched the last commit as well. Once Travis gives green light, I'll merge it. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542988#comment-16542988 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/6283 Thanks @tillrohrmann for review. I've pushed one more commit that fixes test failure. It adds proper comparison of `RestartStrategies`, otherwise `org.apache.flink.api.common.ExecutionConfigTest#testExecutionConfigSerialization` fails. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542979#comment-16542979 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202333518 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -567,22 +560,16 @@ private void configureCheckpointing() { long interval = cfg.getCheckpointInterval(); if (interval > 0) { - ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); // propagate the expected behaviour for checkpoint errors to task. executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors()); - - // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy - if (executionConfig.getRestartStrategy() == null) { - // if the user enabled checkpointing, the default number of exec retries is infinite. - executionConfig.setRestartStrategy( - RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY)); - } } else { // interval of max value means disable periodic checkpoint interval = Long.MAX_VALUE; } + + --- End diff -- Remove two line breaks > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542921#comment-16542921 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202320783 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java --- @@ -357,6 +362,42 @@ public void testRestoringFromSavepoint() throws Exception { } } + /** +* Tests that in a streaming use case where checkpointing is enabled, a +* fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart +* strategy has been specified. +*/ + @Test + public void testAutomaticRestartingWhenCheckpointing() throws Exception { + // create savepoint data + final long savepointId = 42L; + final File savepointFile = createSavepoint(savepointId); + + // set savepoint settings + final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath( + savepointFile.getAbsolutePath(), + true); + final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings); + + final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); + final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory( + completedCheckpointStore, + new StandaloneCheckpointIDCounter()); + haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); + final JobMaster jobMaster = createJobMaster( + new Configuration(), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build()); --- End diff -- This was the problem with wrongly handling default value in `RestartStrategyFactory`. Fixed now. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542899#comment-16542899 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202319384 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -339,12 +339,28 @@ public void setSnapshotSettings(JobCheckpointingSettings settings) { * Gets the settings for asynchronous snapshots. This method returns null, when * checkpointing is not enabled. * -* @return The snapshot settings, or null, if checkpointing is not enabled. +* @return The snapshot settings */ public JobCheckpointingSettings getCheckpointingSettings() { return snapshotSettings; } + /** +* Checks if the checkpointing was enabled for this job graph +* +* @return true if checkpointing enabled +*/ + public boolean isCheckpointingEnabled() { + + if (snapshotSettings == null) { + return false; + } + + long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval(); + return checkpointInterval > 0 && + checkpointInterval < Long.MAX_VALUE; --- End diff -- I don't think it is true (about the checkpoint enabling). I thought the same based on some javadocs, but it turned out that `snapshotSetting` is always set in `org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator#configureCheckpointing`. That's why I added this method. The problem with the second method is that the `CheckpointCoordinator` is created while constructing `ExecutionGraph` which requires the restartstrategy. I thought adding this method was the least invasive one. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542859#comment-16542859 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202311125 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java --- @@ -149,7 +149,7 @@ public static RestartStrategyFactory createRestartStrategyFactory(Configuration } // fallback in case of an error - return NoRestartStrategy.createFactory(configuration); + return NoOrFixedIfCheckpointingEnabledRestartStrategy.createFactory(configuration); --- End diff -- Don't know why, but assumed the `default` branch is reached in case nothing was set in config. My mistake. I've fixed it to differentiate the situation when `"none"` was set (this value is used across documentation, I think it should translate directly to `NoRestart`) and when the config was not set at all. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542820#comment-16542820 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202297182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategy.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; + +/** + * Default restart strategy that resolves either to {@link NoRestartStrategy} or {@link FixedDelayRestartStrategy} + * depending if checkpointing was enabled. + */ +public class NoOrFixedIfCheckpointingEnabledRestartStrategy implements RestartStrategy { + + private static final long DEFAULT_RESTART_DELAY = 0; + + private final RestartStrategy resolvedStrategy; + + /** +* Creates a NoOrFixedIfCheckpointingEnabledRestartStrategyFactory instance. +* +* @param configuration Configuration object which is ignored +* @return NoOrFixedIfCheckpointingEnabledRestartStrategyFactory instance +*/ + public static NoOrFixedIfCheckpointingEnabledRestartStrategyFactory createFactory(Configuration configuration) { + return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(); + } + + /** +* Creates instance of NoOrFixedIfCheckpointingEnabledRestartStrategy +* +* @param isCheckpointingEnabled if true resolves to {@link FixedDelayRestartStrategy} +* otherwise to {@link NoRestartStrategy} +*/ + public NoOrFixedIfCheckpointingEnabledRestartStrategy(boolean isCheckpointingEnabled) { + if (isCheckpointingEnabled) { + resolvedStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY); + } else { + resolvedStrategy = new NoRestartStrategy(); + } + } + + @Override + public boolean canRestart() { + return resolvedStrategy.canRestart(); + } + + @Override + public void restart(RestartCallback restarter, ScheduledExecutor executor) { + resolvedStrategy.restart(restarter, executor); + } + + public static class NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends RestartStrategyFactory { --- End diff -- Wouldn't it be enough to only have this restart strategy factory without the corresponding `RestartStrategy`? We could instantiate the respective strategies in the `createRestartStrategy(boolean isCheckpointingEnabled)` method. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at >
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542822#comment-16542822 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202304777 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -339,12 +339,28 @@ public void setSnapshotSettings(JobCheckpointingSettings settings) { * Gets the settings for asynchronous snapshots. This method returns null, when * checkpointing is not enabled. * -* @return The snapshot settings, or null, if checkpointing is not enabled. +* @return The snapshot settings */ public JobCheckpointingSettings getCheckpointingSettings() { return snapshotSettings; } + /** +* Checks if the checkpointing was enabled for this job graph +* +* @return true if checkpointing enabled +*/ + public boolean isCheckpointingEnabled() { + + if (snapshotSettings == null) { + return false; + } + + long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval(); + return checkpointInterval > 0 && + checkpointInterval < Long.MAX_VALUE; --- End diff -- I think technically, we enable checkpointing, meaning creating a `CheckpointCoordinator`, always iff `snapshotSettings != null`. We could also say that we check the `CheckpointCoordinator.isPeriodicCheckpointingConfigured` in order to decide whether checkpointing is enabled. Then we would not need to introduce this method which could go out of sync with how we define whether checkpointing is enabled or not. What do you think? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542821#comment-16542821 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202297327 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java --- @@ -149,7 +149,7 @@ public static RestartStrategyFactory createRestartStrategyFactory(Configuration } // fallback in case of an error - return NoRestartStrategy.createFactory(configuration); + return NoOrFixedIfCheckpointingEnabledRestartStrategy.createFactory(configuration); --- End diff -- I think we should also create this factory if the `restart-strategy` configuration value is `"non"` which is the default value. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542819#comment-16542819 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202298725 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java --- @@ -357,6 +362,42 @@ public void testRestoringFromSavepoint() throws Exception { } } + /** +* Tests that in a streaming use case where checkpointing is enabled, a +* fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart +* strategy has been specified. +*/ + @Test + public void testAutomaticRestartingWhenCheckpointing() throws Exception { + // create savepoint data + final long savepointId = 42L; + final File savepointFile = createSavepoint(savepointId); + + // set savepoint settings + final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath( + savepointFile.getAbsolutePath(), + true); + final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings); + + final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); + final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory( + completedCheckpointStore, + new StandaloneCheckpointIDCounter()); + haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); + final JobMaster jobMaster = createJobMaster( + new Configuration(), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build()); --- End diff -- Changing this line into ``` new TestingJobManagerSharedServicesBuilder() .setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration)) .build() ``` Will make the test fail. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541926#comment-16541926 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202103545 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java --- @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null)); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart()); --- End diff -- You're right. I'm just wondering whether you ever want to enable checkpointing without a restart strategy. So to speak if you set `FallbackRestartStrategy`, enable checkpointing and set `NoRestartStrategy` as the server side `RestartStrategy`, then do you want `FixedRestartStrategy` or `NoRestartStrategy`? On the other hand you might want to disable restarting for all jobs running on your cluster by setting the restart strategy to `NoRestartStrategy`. Maybe the proper solution would be to set `ExecutionConfig#restartStrategy` to `FallbackRestartStrategy` and introduce a new default server side restart strategy `NoOrFixedIfCheckpointingEnabled` which resolved to `FixedRestartStrategy` if checkpointing is enabled and if not it resolves to `NoRestartStrategy`. What do you think? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541899#comment-16541899 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202098798 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java --- @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null)); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart()); --- End diff -- Right now null is a bit different than `FallbackRestartStrategy`. * null - allows fallback to `FixedRestartStrategy` in case of checkpointing enabled and `noRestart` was set on server-side * `FallbackRestartStrategy` - always the server-side strategy is used (indifferent to checkpointing) If we by default set the `FallbackStrategy` we have two options: * we either always set `FixedRestartStrategy` if checkpointing is enabled and `noRestart` was set on server side * we never automatically fallback to `FixedRestartStrategy`, even in case of checkpointing. What do you think would be better option? Keep the null, always fallback to `FixedRestartStrategy` or never fallback to it? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541797#comment-16541797 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202012704 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; + +import javax.annotation.Nullable; + +/** + * Utility method for resolving {@link RestartStrategy}. + */ +public final class RestartStrategyResolving { + + private static final long DEFAULT_RESTART_DELAY = 0; + + /** +* Resolves which {@link RestartStrategy} to use. It should be used only on the server side. +* The resolving strategy is as follows: +* +* Strategy set within job graph. +* Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing is enabled. +* If no strategy was set on client and server side and checkpointing was enabled then {@link FixedDelayRestartStrategy} is used +* +* +* @param clientConfigurationrestart configuration given within the job graph +* @param serverStrategyFactory default server side strategy factory +* @param isCheckpointingEnabled if checkpointing was enabled for the job --- End diff -- Please don't align the java doc strings. The problem is whenever someone changes the names of the parameters, he will be tempted to also correct the then wrong indentation which is unnecessary work. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541795#comment-16541795 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202013103 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; + +import javax.annotation.Nullable; + +/** + * Utility method for resolving {@link RestartStrategy}. + */ +public final class RestartStrategyResolving { + + private static final long DEFAULT_RESTART_DELAY = 0; + + /** +* Resolves which {@link RestartStrategy} to use. It should be used only on the server side. +* The resolving strategy is as follows: +* +* Strategy set within job graph. +* Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing is enabled. +* If no strategy was set on client and server side and checkpointing was enabled then {@link FixedDelayRestartStrategy} is used +* +* +* @param clientConfigurationrestart configuration given within the job graph +* @param serverStrategyFactory default server side strategy factory +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return resolved strategy +*/ + public static RestartStrategy resolve( + @Nullable RestartStrategies.RestartStrategyConfiguration clientConfiguration, + RestartStrategyFactory serverStrategyFactory, + boolean isCheckpointingEnabled) { + + final RestartStrategy serverSideRestartStrategy = serverStrategyFactory.createRestartStrategy(); + + RestartStrategy clientSideRestartStrategy = null; --- End diff -- could be `final` > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); >
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541799#comment-16541799 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202065356 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.time.Time; + +import org.junit.Test; + +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link RestartStrategyResolving}. + */ +public class RestartStrategyResolvingTest { + + @Test + public void testClientSideHighestPriority() { + + RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(noRestart(), + new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L), + true); + + assertTrue(resolvedStrategy instanceof NoRestartStrategy); --- End diff -- For the future I would suggest to use Hamcrest matchers, because they give better failure messages and are more expressive. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541800#comment-16541800 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202071547 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; + +import javax.annotation.Nullable; + +/** + * Utility method for resolving {@link RestartStrategy}. + */ +public final class RestartStrategyResolving { + + private static final long DEFAULT_RESTART_DELAY = 0; + + /** +* Resolves which {@link RestartStrategy} to use. It should be used only on the server side. +* The resolving strategy is as follows: +* +* Strategy set within job graph. +* Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing is enabled. +* If no strategy was set on client and server side and checkpointing was enabled then {@link FixedDelayRestartStrategy} is used +* +* +* @param clientConfigurationrestart configuration given within the job graph +* @param serverStrategyFactory default server side strategy factory +* @param isCheckpointingEnabled if checkpointing was enabled for the job +* @return resolved strategy +*/ + public static RestartStrategy resolve( + @Nullable RestartStrategies.RestartStrategyConfiguration clientConfiguration, --- End diff -- By setting the default restart strategy to `FallbackRestartStrategyConfiguration` in the `ExecutionConfig` we could remove the `@Nullable` annotation here and simplify the code by avoiding the null checks. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI,
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541801#comment-16541801 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202070710 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java --- @@ -83,6 +86,10 @@ public void testCoordinatorShutsDownOnFailure() { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true), null)); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fallBackRestart()); --- End diff -- Should we maybe set the `FallbackRestartStrategyConfiguration` per default in the `ExecutionConfig`? That way, we could also simplify the resolve code. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541796#comment-16541796 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202065050 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java --- @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.time.Time; + +import org.junit.Test; + +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link RestartStrategyResolving}. + */ +public class RestartStrategyResolvingTest { --- End diff -- Test classes should extend from `TestLogger` to give better test logging output separation. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541798#comment-16541798 ] ASF GitHub Bot commented on FLINK-9143: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6283#discussion_r202070020 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java --- @@ -33,12 +33,11 @@ public class RestartStrategyTest extends TestLogger { /** -* Tests that in a streaming use case where checkpointing is enabled, a -* fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart -* strategy has been specified. +* Tests that in a streaming use case where checkpointing is enabled, there is no default strategy set on the +* client side. */ @Test - public void testAutomaticRestartingWhenCheckpointing() throws Exception { + public void testNoDefaultStrategyOnClientSideWhenCheckpointing() throws Exception { --- End diff -- Maybe `testNoDefaultStrategyOnClientSideWhenCheckpointingEnabled` > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16536604#comment-16536604 ] ASF GitHub Bot commented on FLINK-9143: --- GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/6283 [FLINK-9143] Use cluster strategy if none was set on client side ## What is the purpose of the change The goal of this PR is to enable configuring default restart strategy from the server side's config. ## Brief change log * no strategy is set on the client side if none explicitly specified * on server side the strategy is resolved based on: client configuration, server side configuration, fallback to `FixedDelayStrategy` if none set on client side and `NoRestartStrategy` set on server side in case of checkpointing enabled ## Verifying this change This change added tests and can be verified as follows: - RestartStrategyResolvingTest.java - tests using cluster pass ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink FLINK-9143 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6283.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6283 commit 8efded9c1e1a555edd7733b35d9f1f49f8cc7304 Author: Dawid Wysakowicz Date: 2018-07-05T11:48:23Z [FLINK-9143] Use cluster strategy if none was set on client side > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Labels: pull-request-available > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516530#comment-16516530 ] ASF GitHub Bot commented on FLINK-9143: --- Github user yuqi1129 commented on the issue: https://github.com/apache/flink/pull/5846 @dawidwys Sorry , I will push my code in two days, if there still have any problem, any suggestions are appreciated. any way, if you have try to fix this problem, just go head. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513780#comment-16513780 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5846 Hi @yuqi1129 , do you still want to work on this? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492691#comment-16492691 ] ASF GitHub Bot commented on FLINK-9143: --- Github user yuqi1129 commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191211627 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, job = gen.compileJobGraph((OptimizedPlan) optPlan); } + // if we disable checkpoint and do not set restart strategy, Restart strategy will be set as in flink-conf.yaml + // in flip6, jobmaster do not set this conf, so we have set this conf here. --- End diff -- You are right, I omit this point, sorry > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492423#comment-16492423 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191150742 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, job = gen.compileJobGraph((OptimizedPlan) optPlan); } + // if we disable checkpoint and do not set restart strategy, Restart strategy will be set as in flink-conf.yaml + // in flip6, jobmaster do not set this conf, so we have set this conf here. --- End diff -- I am pretty sure the behaviour is still there. In JobMaster.java, or am I wrong? final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = jobGraph.getSerializedExecutionConfig() .deserializeValue(userCodeLoader) .getRestartStrategy(); this.restartStrategy = (restartStrategyConfiguration != null) ? RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) : jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy(); > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492399#comment-16492399 ] ASF GitHub Bot commented on FLINK-9143: --- Github user yuqi1129 commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191143764 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -255,6 +264,35 @@ public void start() throws Exception { } } + /** +* --- End diff -- OK > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492400#comment-16492400 ] ASF GitHub Bot commented on FLINK-9143: --- Github user yuqi1129 commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191143882 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -255,6 +264,35 @@ public void start() throws Exception { } } + /** +* +* @param job job graph +* @param configuration configuration in flink-conf.yaml +* @param isClusterPoint whether this is client side or cluster site +*/ + public static void setJobgraphRestartStrategy(JobGraph job, Configuration configuration, boolean isClusterPoint) { --- End diff -- en. Indeed this is not a proper place > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492398#comment-16492398 ] ASF GitHub Bot commented on FLINK-9143: --- Github user yuqi1129 commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191143746 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, job = gen.compileJobGraph((OptimizedPlan) optPlan); } + // if we disable checkpoint and do not set restart strategy, Restart strategy will be set as in flink-conf.yaml + // in flip6, jobmaster do not set this conf, so we have set this conf here. --- End diff -- before flip6, cluster site will set restart strategy: ``` val restartStrategy = Option(jobGraph.getSerializedExecutionConfig() .deserializeValue(userCodeLoader) .getRestartStrategy()) .map(RestartStrategyFactory.createRestartStrategy) .filter(p => p != null) match { case Some(strategy) => strategy case None => restartStrategyFactory.createRestartStrategy() } ``` in Jobmanger.java in flip6, cluster will directly use jobgraph deserialized from client size > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492389#comment-16492389 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191133623 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -255,6 +264,35 @@ public void start() throws Exception { } } + /** +* +* @param job job graph +* @param configuration configuration in flink-conf.yaml +* @param isClusterPoint whether this is client side or cluster site +*/ + public static void setJobgraphRestartStrategy(JobGraph job, Configuration configuration, boolean isClusterPoint) { + + try { + ExecutionConfig config = job.getSerializedExecutionConfig().deserializeValue( + Dispatcher.class.getClassLoader()); + + if (config.getRestartStrategy() == null) { + RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = + RestartStrategyFactory.createRestartStrategyConfiguration(configuration); + + //FixedDelay will be as default + if (restartStrategyConfiguration == null && isClusterPoint) { --- End diff -- I think in case the checkpointing is disabled the default strategy should be `noRestart`. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492388#comment-16492388 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191133308 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -896,6 +898,10 @@ public static JobGraph getJobGraph(Configuration flinkConfig, FlinkPlan optPlan, job = gen.compileJobGraph((OptimizedPlan) optPlan); } + // if we disable checkpoint and do not set restart strategy, Restart strategy will be set as in flink-conf.yaml + // in flip6, jobmaster do not set this conf, so we have set this conf here. --- End diff -- Don't get this comment? Why do you say the jobmaster does not set it? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492390#comment-16492390 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191140799 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -250,6 +259,35 @@ public void start() throws Exception { } } + /** +* +* @param job job graph +* @param configuration configuration in flink-conf.yaml +* @param isClusterPoint whether this is client side or cluster site +*/ + public static void setJobgraphRestartStrategy(JobGraph job, Configuration configuration, boolean isClusterPoint) { + + try { + ExecutionConfig config = job.getSerializedExecutionConfig().deserializeValue( + Dispatcher.class.getClassLoader()); --- End diff -- You should use userclassloader here (especially in case of cluster side) > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492387#comment-16492387 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191133512 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -255,6 +264,35 @@ public void start() throws Exception { } } + /** +* +* @param job job graph +* @param configuration configuration in flink-conf.yaml +* @param isClusterPoint whether this is client side or cluster site +*/ + public static void setJobgraphRestartStrategy(JobGraph job, Configuration configuration, boolean isClusterPoint) { --- End diff -- Don't think it is the right place. Especially as it is used also on the client side. Maybe move to `JobGraph`? > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492386#comment-16492386 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5846#discussion_r191133405 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -255,6 +264,35 @@ public void start() throws Exception { } } + /** +* --- End diff -- Missing javadoc. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492356#comment-16492356 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5846 One comment first. Please reply to review comments in github, otherwise it is hard to follow the discussion. Comments from github are posted to the jira issue automatically. I still think your changes only affect the situation when checkpointing is disabled. If checkpointing is enabled the default strategy is set on the client side via: `ClusteClient#getJobGraph > StreamGraph#getJobGraph > StreamingJobGraphGenerator#createJobGraph > StreamingJobGraphGenerator#configureCheckpointing. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491687#comment-16491687 ] yuqi commented on FLINK-9143: - [~dawidwys] Thanks for your reviews. ``` Did I understand this change correctly that it only affects situation when the checkpointing is disabled? ``` Not really. What i change is when user do not set restart strategy in the flink code explicitly, then we will use strategy set in client configuration, if not set in client configuration, we will use cluster configuration. ```After your change the cluster configuration will never be taken into account (neither with nor without checkpointing).``` Indeed, createRestartStrategyConfiguration will not return null even restart strategy was not set in client configuration, i have change the logic. and job manager will set this point when submit job as restart strategy is null. In mode before flip6, job manager have take cluster configuration of flink-conf.yaml effect, So, I only need to change job master in flip6. About your suggestion, i do agree on this point. may be 1. set in code 2. client configuration 3. cluster configuration 4. default/no default. if you have time, please help to review the change again. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487057#comment-16487057 ] ASF GitHub Bot commented on FLINK-9143: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5846 @yuqi1129 Thank you for your contribution. Did I understand this change correctly that it only affects situation when the checkpointing is disabled? Currently in this case the `RestartStrategy` will be set from cluster's `flink-conf.yaml`. After your change the cluster configuration will never be taken into account (neither with nor without checkpointing). Therefore I would be against merging it in current shape, as it does not address the original problem, but just changes only one of many corner cases. What do you think about such strategy for resolving `RestartStrategy`?: 1. job config * set in code * client configuration * (no default, even in case of checkpointing) 2. cluster config * cluster configuration (flink-conf) * default value = FixedDelayRestart (might be only in case of checkpointing) > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438710#comment-16438710 ] ASF GitHub Bot commented on FLINK-9143: --- Github user yuqi1129 commented on the issue: https://github.com/apache/flink/pull/5846 @tillrohrmann Can you help to look if this change is OK ? thanks > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-metadata] > state.backend.rocksdb.checkpointdir: > [file:///tmp/nfsrecovery/flink-checkpoints-rocksdb] > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = > env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) > { throw new NullPointerException("NPE"); } > > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). #2147483647 restart > attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16438252#comment-16438252 ] ASF GitHub Bot commented on FLINK-9143: --- GitHub user yuqi1129 opened a pull request: https://github.com/apache/flink/pull/5846 [FLINK-9143] [client] Restart strategy defined in flink-conf.yaml is ignored ## What is the purpose of the change This change is to fix the bug that if restart strategy was not specifically set in user code, flink will not use default strategy set in flink-conf.yaml in flip6 mode. ## Brief change log - In `ClusterClient.getJobGraph`, if no strategy was set in user code, we set strategy strategy just as flink-conf.yaml do ## Verifying this change This change added tests and can be verified as follows: - Add `ClusterClientTest.testgetJobGraph` - Add `FailureRateRestartStrategyTest.testFailureRateRestartStrategyConf` and `FixedDelayRestartStrategyTest.testFixedRestartStrategyConf` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yuqi1129/flink confproblem Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5846.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5846 commit 7c2a8045a310e2942f4fbec3b4a2696b73fae3af Author: hzyuqi1Date: 2018-04-14T05:45:06Z [FLINK-9143] [client] Restart strategy defined in flink-conf.yaml is ignored This closes #9143. commit df23fbe809c2cbd025a923cd8607ec852d40b4ae Author: hzyuqi1 Date: 2018-04-14T05:46:21Z Merge branch 'master' of https://github.com/apache/flink into confproblem > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-metadata > state.backend.rocksdb.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-rocksdb > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction (){ > @Override > public String map(String obj) { > throw new NullPointerException("NPE"); > } > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). > #[2147483647|tel:(214)%20748-3647] restart attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16437604#comment-16437604 ] Till Rohrmann commented on FLINK-9143: -- The problem is that we don't necessarily have the cluster configuration when we submit the job. Consequently, it is a priori not possible to decide whether the cluster has a default restart strategy or not. The underlying problem with the restart strategies is that the it is a cluster configuration as well as a job configuration option. I think it should only be a cluster configuration option. If we don't want break the behavior, then a solution could be to differentiate between an explicitly set job-specific restart strategy and the default job-specific restart strategy set when submitting a streaming job with checkpointing enabled. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-metadata > state.backend.rocksdb.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-rocksdb > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) { > throw new NullPointerException("NPE"); > } > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). > #[2147483647|tel:(214)%20748-3647] restart attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429590#comment-16429590 ] yuqi commented on FLINK-9143: - En, Agree with you on this point [~Zentol], I also wonder why flink ignores configuration file and set restart strategy to fixed delay if users do not set strategy explicitly in code no matter configuration file is. [~StephanEwen] could you give us some hint ? in jobmanger, flink will sets strategy according to configuraion file if users forget to set this option, see jobmanager.scala {code:java} val restartStrategy = Option(jobGraph.getSerializedExecutionConfig() .deserializeValue(userCodeLoader) .getRestartStrategy()) .map(RestartStrategyFactory.createRestartStrategy) .filter(p => p != null) match { case Some(strategy) => strategy case None => restartStrategyFactory.createRestartStrategy() }{code} Back to this issue. restart strategy will be set to fixed delay if no one was set by flink client. as show above, at this time restartStrategy on the jobmanager will never be `None` and thus cause problem. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-metadata > state.backend.rocksdb.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-rocksdb > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) { > throw new NullPointerException("NPE"); > } > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). > #[2147483647|tel:(214)%20748-3647] restart attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429285#comment-16429285 ] Chesnay Schepler commented on FLINK-9143: - Yes itt is a bug: {{// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy}} This check completely ignores the configuration file, which it shouldn't do. > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Assignee: yuqi >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-metadata > state.backend.rocksdb.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-rocksdb > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) { > throw new NullPointerException("NPE"); > } > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). > #[2147483647|tel:(214)%20748-3647] restart attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429282#comment-16429282 ] yuqi commented on FLINK-9143: - As far as i know, if you set {code:java} env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);{code} by default, flink will use `fixedDelayRestart`, see blow: {code:java} private void configureCheckpointing() { CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval = cfg.getCheckpointInterval(); if (interval > 0) { // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy if (streamGraph.getExecutionConfig().getRestartStrategy() == null) { // if the user enabled checkpointing, the default number of exec retries is infinite. streamGraph.getExecutionConfig().setRestartStrategy( RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY)); } } else { // interval of max value means disable periodic checkpoint interval = Long.MAX_VALUE; }{code} So, this is not a bug, what's your option?[~till.rohrmann] > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug > Components: Configuration >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-metadata > state.backend.rocksdb.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-rocksdb > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) { > throw new NullPointerException("NPE"); > } > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). > #[2147483647|tel:(214)%20748-3647] restart attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9143) Restart strategy defined in flink-conf.yaml is ignored
[ https://issues.apache.org/jira/browse/FLINK-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428060#comment-16428060 ] Alex Smirnov commented on FLINK-9143: - mailing list discusssion: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Restart-strategy-defined-in-flink-conf-yaml-is-ignored-td19361.html > Restart strategy defined in flink-conf.yaml is ignored > -- > > Key: FLINK-9143 > URL: https://issues.apache.org/jira/browse/FLINK-9143 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.2 >Reporter: Alex Smirnov >Priority: Major > Attachments: execution_config.png, jobmanager.log, jobmanager.png > > > Restart strategy defined in flink-conf.yaml is disregarded, when user enables > checkpointing. > Steps to reproduce: > 1. Download flink distribution (1.4.2), update flink-conf.yaml: > > restart-strategy: none > state.backend: rocksdb > state.backend.fs.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-metadata > state.backend.rocksdb.checkpointdir: > file:///tmp/nfsrecovery/flink-checkpoints-rocksdb > > 2. create new java project as described at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/quickstart/java_api_quickstart.html] > here's the code: > public class FailedJob > { > static final Logger LOGGER = LoggerFactory.getLogger(FailedJob.class); > public static void main( String[] args ) throws Exception > { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); > DataStream stream = env.fromCollection(Arrays.asList("test")); > stream.map(new MapFunction(){ > @Override > public String map(String obj) { > throw new NullPointerException("NPE"); > } > }); > env.execute("Failed job"); > } > } > > 3. Compile: mvn clean package; submit it to the cluster > > 4. Go to Job Manager configuration in WebUI, ensure settings from > flink-conf.yaml is there (screenshot attached) > > 5. Go to Job's configuration, see Execution Configuration section > > *Expected result*: restart strategy as defined in flink-conf.yaml > > *Actual result*: Restart with fixed delay (1 ms). > #[2147483647|tel:(214)%20748-3647] restart attempts. > > > see attached screenshots and jobmanager log (line 1 and 31) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)