[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623752#comment-16623752 ] ASF GitHub Bot commented on FLINK-10255: Clark commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#issuecomment-423567013 Thanks for the reply, that'll make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623264#comment-16623264 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#issuecomment-423455756 I think it should not be possible to have two async recovery operations ongoing since either of them will have to wait for the other to complete. That was the idea of the fix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618729#comment-16618729 ] ASF GitHub Bot commented on FLINK-10255: Clark commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#issuecomment-422313111 Hi @tillrohrmann , is it possible that two async operation that modifies the same recoveryOperation at the same time? Would that be serializable in that case? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614802#comment-16614802 ] ASF GitHub Bot commented on FLINK-10255: asfgit closed pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java index 326e924b448..d8ad5aba8ea 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Properties; @@ -36,8 +37,8 @@ @Nonnull private final SavepointRestoreSettings savepointRestoreSettings; - public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) { - super(configDir, dynamicProperties, args, restPort); + public StandaloneJobClusterConfiguration(@Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String hostname, int restPort, @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings) { + super(configDir, dynamicProperties, args, hostname, restPort); this.jobClassName = jobClassName; this.savepointRestoreSettings = savepointRestoreSettings; } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java index 3c65ba864ed..17217eff018 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java @@ -32,6 +32,7 @@ import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION; import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION; +import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION; import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION; /** @@ -67,6 +68,7 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt()); final String restPortString = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1"); final int restPort = Integer.parseInt(restPortString); + final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt()); final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt()); final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine); @@ -74,6 +76,7 @@ public StandaloneJobClusterConfiguration createResult(@Nonnull CommandLine comma configDir, dynamicProperties, commandLine.getArgs(), + hostname, restPort, jobClassName, savepointRestoreSettings); diff --git a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java index 5864c8a985d..6fc5b76f246 100644 --- a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java +++ b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java @@ -30,7 +30,7 @@ * @param type of the thrown exception */ @FunctionalInterface -public interface BiConsumerWithException extends BiConsumer { +public interface BiConsumerWithException { /** * Performs
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613200#comment-16613200 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#issuecomment-420928749 Thanks for the review @GJL. I've addressed your comments and after Travis gives green light, I'll merge it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613163#comment-16613163 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217291997 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java ## @@ -167,7 +181,7 @@ public void testSubmittedJobGraphRelease() throws Exception { // recover the job final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId); - assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue())); + assertThat(submittedJobGraph, is(Matchers.notNullValue())); Review comment: Good catch. Will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613162#comment-16613162 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217291912 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) + .thenAcceptAsync( + (ConsumerWithException) (Boolean isRecoveredJobRunning) -> { Review comment: Good point. I like this approach better. Will adapt the existing interfaces. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612370#comment-16612370 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217085458 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java ## @@ -167,7 +181,7 @@ public void testSubmittedJobGraphRelease() throws Exception { // recover the job final SubmittedJobGraph submittedJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobId); - assertThat(submittedJobGraph, Matchers.is(Matchers.notNullValue())); + assertThat(submittedJobGraph, is(Matchers.notNullValue())); Review comment: You added a static import for `is` but not for `notNullValue`. I think this should be consistent. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612362#comment-16612362 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217082586 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) + .thenAcceptAsync( + (ConsumerWithException) (Boolean isRecoveredJobRunning) -> { Review comment: Imo we are doing this wrong. The code would be much more readible with static factory methods: ``` /** * {@link Consumer} that can throw checked exceptions. */ @FunctionalInterface public interface CheckedConsumer { void checkedAccept(T t) throws Exception; static Consumer unchecked(CheckedConsumer checkedConsumer) { return (t) -> { try { checkedConsumer.checkedAccept(t); } catch (Exception e) { ExceptionUtils.rethrow(e); } }; } } ``` This allows for: ``` .thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> { ... })); ... ``` No casts are required. Also when interacting with the Java API, it does not matter what exact type of exception can be thrown – what matters is that the checked exception becomes unchecked. We do not need to generify the exception type in `ConsumerWithException`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612351#comment-16612351 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217082586 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) + .thenAcceptAsync( + (ConsumerWithException) (Boolean isRecoveredJobRunning) -> { Review comment: Imo we are doing this wrong. The code would be much more readible with static factory methods: ``` /** * {@link Consumer} that can throw checked exceptions. */ @FunctionalInterface public interface CheckedConsumer { void checkedAccept(T t) throws Exception; static Consumer unchecked(CheckedConsumer checkedConsumer) { return (t) -> { try { checkedConsumer.checkedAccept(t); } catch (Exception e) { ExceptionUtils.rethrow(e); } }; } } ``` This allows for: ``` .thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> { ... })); ... ``` No casts are required. Also when interacting with the Java API, it does not matter what exact type of exception can be thrown – what matters is that the checked exception becomes a unchecked. We do not need to generify the exception type in `ConsumerWithException`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612349#comment-16612349 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217082586 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) + .thenAcceptAsync( + (ConsumerWithException) (Boolean isRecoveredJobRunning) -> { Review comment: Imo we are doing this wrong. The code would be much more readible with static factory methods: ``` /** * {@link Consumer} that can throw checked exceptions. */ @FunctionalInterface public interface CheckedConsumer { void checkedAccept(T t) throws Exception; static Consumer unchecked(CheckedConsumer checkedConsumer) { return (t) -> { try { checkedConsumer.checkedAccept(t); } catch (Exception e) { ExceptionUtils.rethrow(e); } }; } } ``` This allows for: ``` CheckedConsumer.unchecked(isRecoveredJobRunning -> { ... }); ``` No casts are required. Also when interacting with the Java API, it does not matter what exact type of exception can be thrown – what matters is that the checked exception becomes a unchecked. We do not need to generify the exception type in `ConsumerWithException`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker >
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612350#comment-16612350 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217082586 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) + .thenAcceptAsync( + (ConsumerWithException) (Boolean isRecoveredJobRunning) -> { Review comment: Imo we are doing this wrong. The code would be much more readible with static factory methods: ``` /** * {@link Consumer} that can throw checked exceptions. */ @FunctionalInterface public interface CheckedConsumer { void checkedAccept(T t) throws Exception; static Consumer unchecked(CheckedConsumer checkedConsumer) { return (t) -> { try { checkedConsumer.checkedAccept(t); } catch (Exception e) { ExceptionUtils.rethrow(e); } }; } } ``` This allows for: ``` .thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> { ... }); ``` No casts are required. Also when interacting with the Java API, it does not matter what exact type of exception can be thrown – what matters is that the checked exception becomes a unchecked. We do not need to generify the exception type in `ConsumerWithException`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority:
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612347#comment-16612347 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217082586 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) + .thenAcceptAsync( + (ConsumerWithException) (Boolean isRecoveredJobRunning) -> { Review comment: Imo we are doing this wrong. The API would be much more readible with static factory methods: ``` /** * {@link Consumer} that can throw checked exceptions. */ @FunctionalInterface public interface CheckedConsumer { void checkedAccept(T t) throws Exception; static Consumer unchecked(CheckedConsumer checkedConsumer) { return (t) -> { try { checkedConsumer.checkedAccept(t); } catch (Exception e) { ExceptionUtils.rethrow(e); } }; } } ``` This allows for: ``` CheckedConsumer.unchecked(isRecoveredJobRunning -> { ... }); ``` No casts are required. Also when interacting with the Java API, it does not matter what exact type of exception can be thrown – what matters is that the checked exception becomes a unchecked. We do not need to generify the exception type in `ConsumerWithException`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker >
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612345#comment-16612345 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217082714 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) Review comment: See my comment regarding the `ConsumerWithException`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612344#comment-16612344 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217082586 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) + .thenAcceptAsync( + (ConsumerWithException) (Boolean isRecoveredJobRunning) -> { Review comment: Imo we are doing this wrong. The API would be much more readible with static factory methods: ``` /** * {@link Consumer} that can throw checked exceptions. */ @FunctionalInterface public interface CheckedConsumer { void checkedAccept(T t) throws Exception; static Consumer unchecked(CheckedConsumer checkedConsumer) { return (t) -> { try { checkedConsumer.checkedAccept(t); } catch (Exception e) { ExceptionUtils.rethrow(e); } }; } } ``` This allows for: ``` CheckedConsumer.unchecked(isRecoveredJobRunning -> { ... }); ``` No casts are required. Also when interacting with the Java API, it does not matter what type of exception can be thrown. We do not need to generify the exception type in `ConsumerWithException`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1,
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612319#comment-16612319 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217077413 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -861,14 +861,18 @@ public void grantLeadership(final UUID newLeaderSessionID) { getMainThreadExecutor()); } - protected CompletableFuture getJobTerminationFuture(JobID jobId) { + CompletableFuture getJobTerminationFuture(JobID jobId) { Review comment: True, I missed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612283#comment-16612283 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217068791 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -861,14 +861,18 @@ public void grantLeadership(final UUID newLeaderSessionID) { getMainThreadExecutor()); } - protected CompletableFuture getJobTerminationFuture(JobID jobId) { + CompletableFuture getJobTerminationFuture(JobID jobId) { if (jobManagerRunners.containsKey(jobId)) { return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId))); } else { return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null)); } } + CompletableFuture getRecoveryOperation() { Review comment: Good point. Will add it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612280#comment-16612280 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217068649 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -861,14 +861,18 @@ public void grantLeadership(final UUID newLeaderSessionID) { getMainThreadExecutor()); } - protected CompletableFuture getJobTerminationFuture(JobID jobId) { + CompletableFuture getJobTerminationFuture(JobID jobId) { Review comment: It cannot be private since the `TestingDispatcher` needs to acces it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612282#comment-16612282 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217068649 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -861,14 +861,18 @@ public void grantLeadership(final UUID newLeaderSessionID) { getMainThreadExecutor()); } - protected CompletableFuture getJobTerminationFuture(JobID jobId) { + CompletableFuture getJobTerminationFuture(JobID jobId) { Review comment: It cannot be private since the `TestingDispatcher` needs to access it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612279#comment-16612279 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217068175 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java ## @@ -80,9 +80,16 @@ void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) { } @VisibleForTesting - public CompletableFuture getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) { + CompletableFuture getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) { return callAsyncWithoutFencing( () -> getJobTerminationFuture(jobId), timeout).thenCompose(Function.identity()); } + + @VisibleForTesting Review comment: You're right. Will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612229#comment-16612229 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217058569 ## File path: flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java ## @@ -0,0 +1,48 @@ +/* Review comment: Yes, I will rearrange the commits before merging. Locally 3a07ee8812afb3feb2233d722311c4de08eef09a is before 868c7dd6822cee24046b486874fef5d5d7d3564d which makes it work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612198#comment-16612198 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217043254 ## File path: flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java ## @@ -0,0 +1,48 @@ +/* Review comment: I think adding this file should be in a commit that is before `[FLINK-10255] Only react to onAddedJobGraph signal when being leader`, or it should be squashed. Without this class your previous commits would not work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612195#comment-16612195 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217046190 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -861,14 +861,18 @@ public void grantLeadership(final UUID newLeaderSessionID) { getMainThreadExecutor()); } - protected CompletableFuture getJobTerminationFuture(JobID jobId) { + CompletableFuture getJobTerminationFuture(JobID jobId) { if (jobManagerRunners.containsKey(jobId)) { return FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobId))); } else { return jobManagerTerminationFutures.getOrDefault(jobId, CompletableFuture.completedFuture(null)); } } + CompletableFuture getRecoveryOperation() { Review comment: This method has wider visibility scope than necessary, and is part of production code. I think `@VisibleForTesting` should be added. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612197#comment-16612197 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217044938 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java ## @@ -80,9 +80,16 @@ void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) { } @VisibleForTesting - public CompletableFuture getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) { + CompletableFuture getJobTerminationFuture(@Nonnull JobID jobId, @Nonnull Time timeout) { return callAsyncWithoutFencing( () -> getJobTerminationFuture(jobId), timeout).thenCompose(Function.identity()); } + + @VisibleForTesting Review comment: Not sure about `@VisibleForTesting` here. This class is already a test utility. It is even in the `test` sources directory. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612196#comment-16612196 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217045367 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ## @@ -861,14 +861,18 @@ public void grantLeadership(final UUID newLeaderSessionID) { getMainThreadExecutor()); } - protected CompletableFuture getJobTerminationFuture(JobID jobId) { + CompletableFuture getJobTerminationFuture(JobID jobId) { Review comment: Should be `private`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612185#comment-16612185 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217043254 ## File path: flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java ## @@ -0,0 +1,48 @@ +/* Review comment: I think this commit should be before `[FLINK-10255] Only react to onAddedJobGraph signal when being leader`, or it should be squashed (`fixup`). Without this class your previous commits would not work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612186#comment-16612186 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217043254 ## File path: flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java ## @@ -0,0 +1,48 @@ +/* Review comment: I think this commit should be before `[FLINK-10255] Only react to onAddedJobGraph signal when being leader`, or it should be squashed. Without this class your previous commits would not work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612172#comment-16612172 ] ASF GitHub Bot commented on FLINK-10255: GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217043254 ## File path: flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java ## @@ -0,0 +1,48 @@ +/* Review comment: I think this commit should be before `[FLINK-10255] Only react to onAddedJobGraph signal when being leader`, or it should be squashed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610263#comment-16610263 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#issuecomment-420190020 Thanks for the comments @TisonKun. I've fixed the failing `DispatcherTest#testOnAddedJobGraphWithFinishedJob`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610223#comment-16610223 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r216575018 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java ## @@ -85,5 +87,13 @@ return FutureUtils.completeAll(terminationFutures); } + public static void stopActor(AkkaActorGateway akkaActorGateway) { Review comment: Good point. I will clean this up as a follow up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610117#comment-16610117 ] ASF GitHub Bot commented on FLINK-10255: TisonKun commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#issuecomment-420151979 Travis show relevant failures, will take a close look later. ``` testGrantingRevokingLeadership(org.apache.flink.runtime.dispatcher.DispatcherHATest) Time elapsed: 0.024 sec <<< ERROR! org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: java.lang.UnsupportedOperationException: Should not be called. at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51) at org.apache.flink.runtime.dispatcher.DispatcherHATest.teardown(DispatcherHATest.java:98) Caused by: java.lang.UnsupportedOperationException: Should not be called. at org.apache.flink.runtime.dispatcher.DispatcherHATest$BlockingSubmittedJobGraphStore.releaseJobGraph(DispatcherHATest.java:306) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:809) at org.apache.flink.util.function.BiFunctionWithException.apply(BiFunctionWithException.java:49) at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105) at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` ``` Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.45 sec <<< FAILURE! - in org.apache.flink.runtime.dispatcher.DispatcherHATest testGrantingRevokingLeadership(org.apache.flink.runtime.dispatcher.DispatcherHATest) Time elapsed: 0.028 sec <<< ERROR! org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: java.lang.UnsupportedOperationException: Should not be called. at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51) at org.apache.flink.runtime.dispatcher.DispatcherHATest.teardown(DispatcherHATest.java:98) Caused by: java.lang.UnsupportedOperationException: Should not be called. at org.apache.flink.runtime.dispatcher.DispatcherHATest$BlockingSubmittedJobGraphStore.releaseJobGraph(DispatcherHATest.java:306) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:809) at org.apache.flink.util.function.BiFunctionWithException.apply(BiFunctionWithException.java:49) at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105) at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` ``` A TaskManager should go into a clean state in case of a JobManager failure(org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase) Time elapsed: 121.247 sec <<< FAILURE! java.lang.AssertionError: assertion failed: timeout (119585594930 nanoseconds) during expectMsg while waiting for Acknowledge at scala.Predef$.assert(Predef.scala:170) at akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:387) at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:364) at akka.testkit.TestKit.expectMsg(TestKit.scala:814) at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerFailsITCase.scala:118) at
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610111#comment-16610111 ] ASF GitHub Bot commented on FLINK-10255: TisonKun commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r216548720 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java ## @@ -85,5 +87,13 @@ return FutureUtils.completeAll(terminationFutures); } + public static void stopActor(AkkaActorGateway akkaActorGateway) { Review comment: In this PR we introduce `stopActor` which is used at one place. After checking all our project, we have define many `stopActor` here and there. Most usages of them are from `TestingUtils` but there are also some from `MesosResourceManager` and `FlinkUntypedActorTest`. Sometimes use `PoisonPill` and sometimes use `Kill`. Apart from this PR, since all stuff interact with Akka would depend on `flink-runtime`, let's unify stopActor Utils. I think here, `ActorUtils` is the best place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
[ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609200#comment-16609200 ] ASF GitHub Bot commented on FLINK-10255: tillrohrmann opened a new pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678 ## What is the purpose of the change The Dispatcher should only react to the onAddedJobGraph signal if it is the leader. In all other cases the signal should be ignored since the jobs will be recovered once the Dispatcher becomes the leader. In order to still support non-blocking job recoveries, this commit serializes all recovery operations by introducing a recoveryOperation future which first needs to complete before a subsequent operation is started. That way we can avoid race conditions between granting and revoking leadership as well as the onAddedJobGraph signals. This is important since we can only lock each JobGraph once and, thus, need to make sure that we don't release a lock of a properly recovered job in a concurrent operation. cc @GJL ## Brief change log - Only react to `SubmittedJobGraphListener#onAddedJobGraph` when being the leader - Serialize recovery operations by introducing a `recoveryOperation` future in order to avoid wrong unlocking of guarded resources ## Verifying this change - Added `ZooKeeperHADispatcherTest#testStandbyDispatcherJobExecution` and `ZooKeeperHADispatcherTest#testStandbyDispatcherJobRecovery` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Standby Dispatcher locks submitted JobGraphs > > > Key: FLINK-10255 > URL: https://issues.apache.org/jira/browse/FLINK-10255 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.1, 1.7.0, 1.5.4 > > > Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are > added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the > {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent > state. > The problem is that we recover in the > {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called > if don't have the leadership the newly added {{JobGraph}}. Recovering the > {{JobGraph}} currently locks the {{JobGraph}}. In case that the > {{Dispatcher}} is not the leader, then we won't start that job after its > recovery. However, we also don't release the {{JobGraph}} leaving it locked. > There are two possible solutions to the problem. Either we check whether we > are the leader before recovering jobs or we say that recovering jobs does not > lock them. Only if we can submit the recovered job we lock them. The latter > approach has the advantage that it follows a quite similar code path as an > initial job submission. Moreover, jobs are currently also recovered at other > places. In all these places we currently would need to release the > {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. > {{Dispatcher#grantLeadership}}). > An extension of the first solution could be to stop the > {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then > we would have to make sure that no concurrent callback from the > {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after > revoking leadership from the {{Dispatcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)