[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623752#comment-16623752
 ] 

ASF GitHub Bot commented on FLINK-10255:


Clark commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph 
signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-423567013
 
 
   Thanks for the reply, that'll make sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623264#comment-16623264
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on issue #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-423455756
 
 
   I think it should not be possible to have two async recovery operations 
ongoing since either of them will have to wait for the other to complete. That 
was the idea of the fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618729#comment-16618729
 ] 

ASF GitHub Bot commented on FLINK-10255:


Clark commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph 
signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-422313111
 
 
   Hi @tillrohrmann , is it possible that two async operation that modifies the 
same recoveryOperation at the same time? Would that be serializable in that 
case? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614802#comment-16614802
 ] 

ASF GitHub Bot commented on FLINK-10255:


asfgit closed pull request #6678: [FLINK-10255] Only react to onAddedJobGraph 
signal when being leader
URL: https://github.com/apache/flink/pull/6678
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 326e924b448..d8ad5aba8ea 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -22,6 +22,7 @@
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Properties;
 
@@ -36,8 +37,8 @@
@Nonnull
private final SavepointRestoreSettings savepointRestoreSettings;
 
-   public StandaloneJobClusterConfiguration(@Nonnull String configDir, 
@Nonnull Properties dynamicProperties, @Nonnull String[] args, int restPort, 
@Nonnull String jobClassName, @Nonnull SavepointRestoreSettings 
savepointRestoreSettings) {
-   super(configDir, dynamicProperties, args, restPort);
+   public StandaloneJobClusterConfiguration(@Nonnull String configDir, 
@Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String 
hostname, int restPort, @Nonnull String jobClassName, @Nonnull 
SavepointRestoreSettings savepointRestoreSettings) {
+   super(configDir, dynamicProperties, args, hostname, restPort);
this.jobClassName = jobClassName;
this.savepointRestoreSettings = savepointRestoreSettings;
}
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index 3c65ba864ed..17217eff018 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -32,6 +32,7 @@
 
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
+import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
 import static 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;
 
 /**
@@ -67,6 +68,7 @@ public StandaloneJobClusterConfiguration 
createResult(@Nonnull CommandLine comma
final Properties dynamicProperties = 
commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
final String restPortString = 
commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
final int restPort = Integer.parseInt(restPortString);
+   final String hostname = 
commandLine.getOptionValue(HOST_OPTION.getOpt());
final String jobClassName = 
commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
final SavepointRestoreSettings savepointRestoreSettings = 
CliFrontendParser.createSavepointRestoreSettings(commandLine);
 
@@ -74,6 +76,7 @@ public StandaloneJobClusterConfiguration 
createResult(@Nonnull CommandLine comma
configDir,
dynamicProperties,
commandLine.getArgs(),
+   hostname,
restPort,
jobClassName,
savepointRestoreSettings);
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
 
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
index 5864c8a985d..6fc5b76f246 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -30,7 +30,7 @@
  * @param  type of the thrown exception
  */
 @FunctionalInterface
-public interface BiConsumerWithException extends 
BiConsumer {
+public interface BiConsumerWithException {
 
/**
 * Performs 

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613200#comment-16613200
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on issue #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-420928749
 
 
   Thanks for the review @GJL. I've addressed your comments and after Travis 
gives green light, I'll merge it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613163#comment-16613163
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217291997
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
 ##
 @@ -167,7 +181,7 @@ public void testSubmittedJobGraphRelease() throws 
Exception {
// recover the job
final SubmittedJobGraph submittedJobGraph = 
otherSubmittedJobGraphStore.recoverJobGraph(jobId);
 
-   assertThat(submittedJobGraph, 
Matchers.is(Matchers.notNullValue()));
+   assertThat(submittedJobGraph, 
is(Matchers.notNullValue()));
 
 Review comment:
   Good catch. Will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16613162#comment-16613162
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217291912
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
@Override
public void onAddedJobGraph(final JobID jobId) {
-   final CompletableFuture recoveredJob = 
getRpcService().execute(
-   () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-   final CompletableFuture submissionFuture = 
recoveredJob.thenComposeAsync(
-   (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-   getMainThreadExecutor());
-
-   submissionFuture.whenComplete(
-   (Acknowledge acknowledge, Throwable throwable) -> {
-   if (throwable != null) {
-   onFatalError(
-   new DispatcherException(
-   String.format("Could 
not start the added job %s", jobId),
-   
ExceptionUtils.stripCompletionException(throwable)));
+   runAsync(
+   () -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   final CompletableFuture 
recoveredJob = recoveryOperation.thenApplyAsync(
+   ignored -> {
+   try {
+   return 
recoverJob(jobId);
+   } catch (Exception e) {
+   
ExceptionUtils.rethrow(e);
+   }
+   return null;
+   },
+   getRpcService().getExecutor());
+
+   final DispatcherId dispatcherId = 
getFencingToken();
+   final CompletableFuture 
submissionFuture = recoveredJob.thenComposeAsync(
+   
(FunctionWithThrowable, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+   .thenAcceptAsync(
+   
(ConsumerWithException) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Good point. I like this approach better. Will adapt the existing interfaces.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock 

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612370#comment-16612370
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217085458
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
 ##
 @@ -167,7 +181,7 @@ public void testSubmittedJobGraphRelease() throws 
Exception {
// recover the job
final SubmittedJobGraph submittedJobGraph = 
otherSubmittedJobGraphStore.recoverJobGraph(jobId);
 
-   assertThat(submittedJobGraph, 
Matchers.is(Matchers.notNullValue()));
+   assertThat(submittedJobGraph, 
is(Matchers.notNullValue()));
 
 Review comment:
   You added a static import for `is` but not for `notNullValue`. I think this 
should be consistent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612362#comment-16612362
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
@Override
public void onAddedJobGraph(final JobID jobId) {
-   final CompletableFuture recoveredJob = 
getRpcService().execute(
-   () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-   final CompletableFuture submissionFuture = 
recoveredJob.thenComposeAsync(
-   (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-   getMainThreadExecutor());
-
-   submissionFuture.whenComplete(
-   (Acknowledge acknowledge, Throwable throwable) -> {
-   if (throwable != null) {
-   onFatalError(
-   new DispatcherException(
-   String.format("Could 
not start the added job %s", jobId),
-   
ExceptionUtils.stripCompletionException(throwable)));
+   runAsync(
+   () -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   final CompletableFuture 
recoveredJob = recoveryOperation.thenApplyAsync(
+   ignored -> {
+   try {
+   return 
recoverJob(jobId);
+   } catch (Exception e) {
+   
ExceptionUtils.rethrow(e);
+   }
+   return null;
+   },
+   getRpcService().getExecutor());
+
+   final DispatcherId dispatcherId = 
getFencingToken();
+   final CompletableFuture 
submissionFuture = recoveredJob.thenComposeAsync(
+   
(FunctionWithThrowable, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+   .thenAcceptAsync(
+   
(ConsumerWithException) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Imo we are doing this wrong. The code would be much more readible with 
static factory methods:
   
   ```
   
   /**
* {@link Consumer} that can throw checked exceptions.
*/
   @FunctionalInterface
   public interface CheckedConsumer {
   
void checkedAccept(T t) throws Exception;
   
static  Consumer unchecked(CheckedConsumer checkedConsumer) {
return (t) -> {
try {
checkedConsumer.checkedAccept(t);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
};
}
   }
   ```
   This allows for:
   ```
   .thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> {
...
   }));
   ...
   ``` 
   No casts are required. Also when interacting with the Java API, it does not 
matter what exact type of exception can be thrown – what matters is that the 
checked exception becomes unchecked. We do not need to generify the exception 
type in `ConsumerWithException`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612351#comment-16612351
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
@Override
public void onAddedJobGraph(final JobID jobId) {
-   final CompletableFuture recoveredJob = 
getRpcService().execute(
-   () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-   final CompletableFuture submissionFuture = 
recoveredJob.thenComposeAsync(
-   (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-   getMainThreadExecutor());
-
-   submissionFuture.whenComplete(
-   (Acknowledge acknowledge, Throwable throwable) -> {
-   if (throwable != null) {
-   onFatalError(
-   new DispatcherException(
-   String.format("Could 
not start the added job %s", jobId),
-   
ExceptionUtils.stripCompletionException(throwable)));
+   runAsync(
+   () -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   final CompletableFuture 
recoveredJob = recoveryOperation.thenApplyAsync(
+   ignored -> {
+   try {
+   return 
recoverJob(jobId);
+   } catch (Exception e) {
+   
ExceptionUtils.rethrow(e);
+   }
+   return null;
+   },
+   getRpcService().getExecutor());
+
+   final DispatcherId dispatcherId = 
getFencingToken();
+   final CompletableFuture 
submissionFuture = recoveredJob.thenComposeAsync(
+   
(FunctionWithThrowable, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+   .thenAcceptAsync(
+   
(ConsumerWithException) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Imo we are doing this wrong. The code would be much more readible with 
static factory methods:
   
   ```
   
   /**
* {@link Consumer} that can throw checked exceptions.
*/
   @FunctionalInterface
   public interface CheckedConsumer {
   
void checkedAccept(T t) throws Exception;
   
static  Consumer unchecked(CheckedConsumer checkedConsumer) {
return (t) -> {
try {
checkedConsumer.checkedAccept(t);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
};
}
   }
   ```
   This allows for:
   ```
   .thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> {
...
   }));
   ...
   ``` 
   No casts are required. Also when interacting with the Java API, it does not 
matter what exact type of exception can be thrown – what matters is that the 
checked exception becomes a unchecked. We do not need to generify the exception 
type in `ConsumerWithException`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612349#comment-16612349
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
@Override
public void onAddedJobGraph(final JobID jobId) {
-   final CompletableFuture recoveredJob = 
getRpcService().execute(
-   () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-   final CompletableFuture submissionFuture = 
recoveredJob.thenComposeAsync(
-   (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-   getMainThreadExecutor());
-
-   submissionFuture.whenComplete(
-   (Acknowledge acknowledge, Throwable throwable) -> {
-   if (throwable != null) {
-   onFatalError(
-   new DispatcherException(
-   String.format("Could 
not start the added job %s", jobId),
-   
ExceptionUtils.stripCompletionException(throwable)));
+   runAsync(
+   () -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   final CompletableFuture 
recoveredJob = recoveryOperation.thenApplyAsync(
+   ignored -> {
+   try {
+   return 
recoverJob(jobId);
+   } catch (Exception e) {
+   
ExceptionUtils.rethrow(e);
+   }
+   return null;
+   },
+   getRpcService().getExecutor());
+
+   final DispatcherId dispatcherId = 
getFencingToken();
+   final CompletableFuture 
submissionFuture = recoveredJob.thenComposeAsync(
+   
(FunctionWithThrowable, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+   .thenAcceptAsync(
+   
(ConsumerWithException) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Imo we are doing this wrong. The code would be much more readible with 
static factory methods:
   
   ```
   
   /**
* {@link Consumer} that can throw checked exceptions.
*/
   @FunctionalInterface
   public interface CheckedConsumer {
   
void checkedAccept(T t) throws Exception;
   
static  Consumer unchecked(CheckedConsumer checkedConsumer) {
return (t) -> {
try {
checkedConsumer.checkedAccept(t);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
};
}
   }
   ```
   This allows for:
   ```
   CheckedConsumer.unchecked(isRecoveredJobRunning -> {
...
   });
   ``` 
   No casts are required. Also when interacting with the Java API, it does not 
matter what exact type of exception can be thrown – what matters is that the 
checked exception becomes a unchecked. We do not need to generify the exception 
type in `ConsumerWithException`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612350#comment-16612350
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
@Override
public void onAddedJobGraph(final JobID jobId) {
-   final CompletableFuture recoveredJob = 
getRpcService().execute(
-   () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-   final CompletableFuture submissionFuture = 
recoveredJob.thenComposeAsync(
-   (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-   getMainThreadExecutor());
-
-   submissionFuture.whenComplete(
-   (Acknowledge acknowledge, Throwable throwable) -> {
-   if (throwable != null) {
-   onFatalError(
-   new DispatcherException(
-   String.format("Could 
not start the added job %s", jobId),
-   
ExceptionUtils.stripCompletionException(throwable)));
+   runAsync(
+   () -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   final CompletableFuture 
recoveredJob = recoveryOperation.thenApplyAsync(
+   ignored -> {
+   try {
+   return 
recoverJob(jobId);
+   } catch (Exception e) {
+   
ExceptionUtils.rethrow(e);
+   }
+   return null;
+   },
+   getRpcService().getExecutor());
+
+   final DispatcherId dispatcherId = 
getFencingToken();
+   final CompletableFuture 
submissionFuture = recoveredJob.thenComposeAsync(
+   
(FunctionWithThrowable, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+   .thenAcceptAsync(
+   
(ConsumerWithException) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Imo we are doing this wrong. The code would be much more readible with 
static factory methods:
   
   ```
   
   /**
* {@link Consumer} that can throw checked exceptions.
*/
   @FunctionalInterface
   public interface CheckedConsumer {
   
void checkedAccept(T t) throws Exception;
   
static  Consumer unchecked(CheckedConsumer checkedConsumer) {
return (t) -> {
try {
checkedConsumer.checkedAccept(t);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
};
}
   }
   ```
   This allows for:
   ```
   .thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> {
...
   });
   ``` 
   No casts are required. Also when interacting with the Java API, it does not 
matter what exact type of exception can be thrown – what matters is that the 
checked exception becomes a unchecked. We do not need to generify the exception 
type in `ConsumerWithException`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: 

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612347#comment-16612347
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
@Override
public void onAddedJobGraph(final JobID jobId) {
-   final CompletableFuture recoveredJob = 
getRpcService().execute(
-   () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-   final CompletableFuture submissionFuture = 
recoveredJob.thenComposeAsync(
-   (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-   getMainThreadExecutor());
-
-   submissionFuture.whenComplete(
-   (Acknowledge acknowledge, Throwable throwable) -> {
-   if (throwable != null) {
-   onFatalError(
-   new DispatcherException(
-   String.format("Could 
not start the added job %s", jobId),
-   
ExceptionUtils.stripCompletionException(throwable)));
+   runAsync(
+   () -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   final CompletableFuture 
recoveredJob = recoveryOperation.thenApplyAsync(
+   ignored -> {
+   try {
+   return 
recoverJob(jobId);
+   } catch (Exception e) {
+   
ExceptionUtils.rethrow(e);
+   }
+   return null;
+   },
+   getRpcService().getExecutor());
+
+   final DispatcherId dispatcherId = 
getFencingToken();
+   final CompletableFuture 
submissionFuture = recoveredJob.thenComposeAsync(
+   
(FunctionWithThrowable, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+   .thenAcceptAsync(
+   
(ConsumerWithException) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Imo we are doing this wrong. The API would be much more readible with static 
factory methods:
   
   ```
   
   /**
* {@link Consumer} that can throw checked exceptions.
*/
   @FunctionalInterface
   public interface CheckedConsumer {
   
void checkedAccept(T t) throws Exception;
   
static  Consumer unchecked(CheckedConsumer checkedConsumer) {
return (t) -> {
try {
checkedConsumer.checkedAccept(t);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
};
}
   }
   ```
   This allows for:
   ```
   CheckedConsumer.unchecked(isRecoveredJobRunning -> {
...
   });
   ``` 
   No casts are required. Also when interacting with the Java API, it does not 
matter what exact type of exception can be thrown – what matters is that the 
checked exception becomes a unchecked. We do not need to generify the exception 
type in `ConsumerWithException`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
> 

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612345#comment-16612345
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082714
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
@Override
public void onAddedJobGraph(final JobID jobId) {
-   final CompletableFuture recoveredJob = 
getRpcService().execute(
-   () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-   final CompletableFuture submissionFuture = 
recoveredJob.thenComposeAsync(
-   (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-   getMainThreadExecutor());
-
-   submissionFuture.whenComplete(
-   (Acknowledge acknowledge, Throwable throwable) -> {
-   if (throwable != null) {
-   onFatalError(
-   new DispatcherException(
-   String.format("Could 
not start the added job %s", jobId),
-   
ExceptionUtils.stripCompletionException(throwable)));
+   runAsync(
+   () -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   final CompletableFuture 
recoveredJob = recoveryOperation.thenApplyAsync(
+   ignored -> {
+   try {
+   return 
recoverJob(jobId);
+   } catch (Exception e) {
+   
ExceptionUtils.rethrow(e);
+   }
+   return null;
+   },
+   getRpcService().getExecutor());
+
+   final DispatcherId dispatcherId = 
getFencingToken();
+   final CompletableFuture 
submissionFuture = recoveredJob.thenComposeAsync(
+   
(FunctionWithThrowable, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
 
 Review comment:
   See my comment regarding the `ConsumerWithException`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to 

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612344#comment-16612344
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
@Override
public void onAddedJobGraph(final JobID jobId) {
-   final CompletableFuture recoveredJob = 
getRpcService().execute(
-   () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-   final CompletableFuture submissionFuture = 
recoveredJob.thenComposeAsync(
-   (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-   getMainThreadExecutor());
-
-   submissionFuture.whenComplete(
-   (Acknowledge acknowledge, Throwable throwable) -> {
-   if (throwable != null) {
-   onFatalError(
-   new DispatcherException(
-   String.format("Could 
not start the added job %s", jobId),
-   
ExceptionUtils.stripCompletionException(throwable)));
+   runAsync(
+   () -> {
+   if (!jobManagerRunners.containsKey(jobId)) {
+   final CompletableFuture 
recoveredJob = recoveryOperation.thenApplyAsync(
+   ignored -> {
+   try {
+   return 
recoverJob(jobId);
+   } catch (Exception e) {
+   
ExceptionUtils.rethrow(e);
+   }
+   return null;
+   },
+   getRpcService().getExecutor());
+
+   final DispatcherId dispatcherId = 
getFencingToken();
+   final CompletableFuture 
submissionFuture = recoveredJob.thenComposeAsync(
+   
(FunctionWithThrowable, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+   .thenAcceptAsync(
+   
(ConsumerWithException) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Imo we are doing this wrong. The API would be much more readible with static 
factory methods:
   
   ```
   
   /**
* {@link Consumer} that can throw checked exceptions.
*/
   @FunctionalInterface
   public interface CheckedConsumer {
   
void checkedAccept(T t) throws Exception;
   
static  Consumer unchecked(CheckedConsumer checkedConsumer) {
return (t) -> {
try {
checkedConsumer.checkedAccept(t);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
};
}
   }
   ```
   This allows for:
   ```
   CheckedConsumer.unchecked(isRecoveredJobRunning -> {
...
   });
   ``` 
   No casts are required. Also when interacting with the Java API, it does not 
matter what type of exception can be thrown. We do not need to generify the 
exception type in `ConsumerWithException`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612319#comment-16612319
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217077413
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -861,14 +861,18 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
getMainThreadExecutor());
}
 
-   protected CompletableFuture getJobTerminationFuture(JobID jobId) {
+   CompletableFuture getJobTerminationFuture(JobID jobId) {
 
 Review comment:
   True, I missed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612283#comment-16612283
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217068791
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -861,14 +861,18 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
getMainThreadExecutor());
}
 
-   protected CompletableFuture getJobTerminationFuture(JobID jobId) {
+   CompletableFuture getJobTerminationFuture(JobID jobId) {
if (jobManagerRunners.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new 
DispatcherException(String.format("Job with job id %s is still running.", 
jobId)));
} else {
return jobManagerTerminationFutures.getOrDefault(jobId, 
CompletableFuture.completedFuture(null));
}
}
 
+   CompletableFuture getRecoveryOperation() {
 
 Review comment:
   Good point. Will add it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612280#comment-16612280
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217068649
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -861,14 +861,18 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
getMainThreadExecutor());
}
 
-   protected CompletableFuture getJobTerminationFuture(JobID jobId) {
+   CompletableFuture getJobTerminationFuture(JobID jobId) {
 
 Review comment:
   It cannot be private since the `TestingDispatcher` needs to acces it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612282#comment-16612282
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217068649
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -861,14 +861,18 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
getMainThreadExecutor());
}
 
-   protected CompletableFuture getJobTerminationFuture(JobID jobId) {
+   CompletableFuture getJobTerminationFuture(JobID jobId) {
 
 Review comment:
   It cannot be private since the `TestingDispatcher` needs to access it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612279#comment-16612279
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217068175
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 ##
 @@ -80,9 +80,16 @@ void completeJobExecution(ArchivedExecutionGraph 
archivedExecutionGraph) {
}
 
@VisibleForTesting
-   public CompletableFuture getJobTerminationFuture(@Nonnull JobID 
jobId, @Nonnull Time timeout) {
+   CompletableFuture getJobTerminationFuture(@Nonnull JobID jobId, 
@Nonnull Time timeout) {
return callAsyncWithoutFencing(
() -> getJobTerminationFuture(jobId),
timeout).thenCompose(Function.identity());
}
+
+   @VisibleForTesting
 
 Review comment:
   You're right. Will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612229#comment-16612229
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217058569
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java
 ##
 @@ -0,0 +1,48 @@
+/*
 
 Review comment:
   Yes, I will rearrange the commits before merging. Locally 
3a07ee8812afb3feb2233d722311c4de08eef09a is before 
868c7dd6822cee24046b486874fef5d5d7d3564d which makes it work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612198#comment-16612198
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217043254
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java
 ##
 @@ -0,0 +1,48 @@
+/*
 
 Review comment:
   I think adding this file should be in a commit that is before `[FLINK-10255] 
Only react to onAddedJobGraph signal when being leader`, or it should be 
squashed. Without this class your previous commits would not work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612195#comment-16612195
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217046190
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -861,14 +861,18 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
getMainThreadExecutor());
}
 
-   protected CompletableFuture getJobTerminationFuture(JobID jobId) {
+   CompletableFuture getJobTerminationFuture(JobID jobId) {
if (jobManagerRunners.containsKey(jobId)) {
return FutureUtils.completedExceptionally(new 
DispatcherException(String.format("Job with job id %s is still running.", 
jobId)));
} else {
return jobManagerTerminationFutures.getOrDefault(jobId, 
CompletableFuture.completedFuture(null));
}
}
 
+   CompletableFuture getRecoveryOperation() {
 
 Review comment:
   This method has wider visibility scope than necessary, and is part of 
production code. I think `@VisibleForTesting` should be added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612197#comment-16612197
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217044938
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 ##
 @@ -80,9 +80,16 @@ void completeJobExecution(ArchivedExecutionGraph 
archivedExecutionGraph) {
}
 
@VisibleForTesting
-   public CompletableFuture getJobTerminationFuture(@Nonnull JobID 
jobId, @Nonnull Time timeout) {
+   CompletableFuture getJobTerminationFuture(@Nonnull JobID jobId, 
@Nonnull Time timeout) {
return callAsyncWithoutFencing(
() -> getJobTerminationFuture(jobId),
timeout).thenCompose(Function.identity());
}
+
+   @VisibleForTesting
 
 Review comment:
   Not sure about `@VisibleForTesting` here. This class is already a test 
utility. It is even in the `test` sources directory.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612196#comment-16612196
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217045367
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##
 @@ -861,14 +861,18 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
getMainThreadExecutor());
}
 
-   protected CompletableFuture getJobTerminationFuture(JobID jobId) {
+   CompletableFuture getJobTerminationFuture(JobID jobId) {
 
 Review comment:
   Should be `private`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612185#comment-16612185
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217043254
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java
 ##
 @@ -0,0 +1,48 @@
+/*
 
 Review comment:
   I think this commit should be before `[FLINK-10255] Only react to 
onAddedJobGraph signal when being leader`, or it should be squashed (`fixup`). 
Without this class your previous commits would not work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612186#comment-16612186
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217043254
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java
 ##
 @@ -0,0 +1,48 @@
+/*
 
 Review comment:
   I think this commit should be before `[FLINK-10255] Only react to 
onAddedJobGraph signal when being leader`, or it should be squashed. Without 
this class your previous commits would not work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16612172#comment-16612172
 ] 

ASF GitHub Bot commented on FLINK-10255:


GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217043254
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/util/function/FunctionWithThrowable.java
 ##
 @@ -0,0 +1,48 @@
+/*
 
 Review comment:
   I think this commit should be before `[FLINK-10255] Only react to 
onAddedJobGraph signal when being leader`, or it should be squashed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610263#comment-16610263
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on issue #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-420190020
 
 
   Thanks for the comments @TisonKun. I've fixed the failing 
`DispatcherTest#testOnAddedJobGraphWithFinishedJob`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610223#comment-16610223
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r216575018
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
 ##
 @@ -85,5 +87,13 @@
return FutureUtils.completeAll(terminationFutures);
}
 
+   public static void stopActor(AkkaActorGateway akkaActorGateway) {
 
 Review comment:
   Good point. I will clean this up as a follow up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610117#comment-16610117
 ] 

ASF GitHub Bot commented on FLINK-10255:


TisonKun commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph 
signal when being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-420151979
 
 
   Travis show relevant failures, will take a close look later.
   
   ```
   
testGrantingRevokingLeadership(org.apache.flink.runtime.dispatcher.DispatcherHATest)
  Time elapsed: 0.024 sec  <<< ERROR!
   org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: 
java.lang.UnsupportedOperationException: Should not be called.
at 
org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51)
at 
org.apache.flink.runtime.dispatcher.DispatcherHATest.teardown(DispatcherHATest.java:98)
   Caused by: java.lang.UnsupportedOperationException: Should not be called.
at 
org.apache.flink.runtime.dispatcher.DispatcherHATest$BlockingSubmittedJobGraphStore.releaseJobGraph(DispatcherHATest.java:306)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:809)
at 
org.apache.flink.util.function.BiFunctionWithException.apply(BiFunctionWithException.java:49)
at 
java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
at 
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   ```
   
   ```
   Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.45 sec <<< 
FAILURE! - in org.apache.flink.runtime.dispatcher.DispatcherHATest
   
testGrantingRevokingLeadership(org.apache.flink.runtime.dispatcher.DispatcherHATest)
  Time elapsed: 0.028 sec  <<< ERROR!
   org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: 
java.lang.UnsupportedOperationException: Should not be called.
at 
org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51)
at 
org.apache.flink.runtime.dispatcher.DispatcherHATest.teardown(DispatcherHATest.java:98)
   Caused by: java.lang.UnsupportedOperationException: Should not be called.
at 
org.apache.flink.runtime.dispatcher.DispatcherHATest$BlockingSubmittedJobGraphStore.releaseJobGraph(DispatcherHATest.java:306)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:809)
at 
org.apache.flink.util.function.BiFunctionWithException.apply(BiFunctionWithException.java:49)
at 
java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
at 
java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   ```
   
   ```
   A TaskManager should go into a clean state in case of a JobManager 
failure(org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase)  
Time elapsed: 121.247 sec  <<< FAILURE!
   java.lang.AssertionError: assertion failed: timeout (119585594930 
nanoseconds) during expectMsg while waiting for Acknowledge
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:387)
at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:364)
at akka.testkit.TestKit.expectMsg(TestKit.scala:814)
at 
org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerFailsITCase.scala:118)
at 

[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610111#comment-16610111
 ] 

ASF GitHub Bot commented on FLINK-10255:


TisonKun commented on a change in pull request #6678: [FLINK-10255] Only react 
to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r216548720
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
 ##
 @@ -85,5 +87,13 @@
return FutureUtils.completeAll(terminationFutures);
}
 
+   public static void stopActor(AkkaActorGateway akkaActorGateway) {
 
 Review comment:
   In this PR we introduce `stopActor` which is used at one place. After 
checking all our project, we have define many `stopActor` here and there. Most 
usages of them are from `TestingUtils` but there are also some from 
`MesosResourceManager` and `FlinkUntypedActorTest`. Sometimes use `PoisonPill` 
and sometimes use `Kill`. 
   Apart from this PR, since all stuff interact with Akka would depend on 
`flink-runtime`, let's unify stopActor Utils.
   I think here, `ActorUtils` is the best place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs

2018-09-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609200#comment-16609200
 ] 

ASF GitHub Bot commented on FLINK-10255:


tillrohrmann opened a new pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678
 
 
   ## What is the purpose of the change
   
   The Dispatcher should only react to the onAddedJobGraph signal if it is the 
leader.
   In all other cases the signal should be ignored since the jobs will be 
recovered once
   the Dispatcher becomes the leader.
   
   In order to still support non-blocking job recoveries, this commit 
serializes all
   recovery operations by introducing a recoveryOperation future which first 
needs to
   complete before a subsequent operation is started. That way we can avoid 
race conditions
   between granting and revoking leadership as well as the onAddedJobGraph 
signals. This is
   important since we can only lock each JobGraph once and, thus, need to make 
sure that
   we don't release a lock of a properly recovered job in a concurrent 
operation.
   
   cc @GJL 
   
   ## Brief change log
   
   - Only react to `SubmittedJobGraphListener#onAddedJobGraph` when being the 
leader
   - Serialize recovery operations by introducing a `recoveryOperation` future 
in order to avoid wrong unlocking of guarded resources
   
   ## Verifying this change
   
   - Added `ZooKeeperHADispatcherTest#testStandbyDispatcherJobExecution` and 
`ZooKeeperHADispatcherTest#testStandbyDispatcherJobRecovery`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> 
>
> Key: FLINK-10255
> URL: https://issues.apache.org/jira/browse/FLINK-10255
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are 
> added to the {{SubmittedJobGraphStore}} if HA mode is enabled. Locking the 
> {{JobGraphs}} can prevent their cleanup leaving the system in an inconsistent 
> state.
> The problem is that we recover in the 
> {{SubmittedJobGraphListener#onAddedJobGraph}} callback which is also called 
> if don't have the leadership the newly added {{JobGraph}}. Recovering the 
> {{JobGraph}} currently locks the {{JobGraph}}. In case that the 
> {{Dispatcher}} is not the leader, then we won't start that job after its 
> recovery. However, we also don't release the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we 
> are the leader before recovering jobs or we say that recovering jobs does not 
> lock them. Only if we can submit the recovered job we lock them. The latter 
> approach has the advantage that it follows a quite similar code path as an 
> initial job submission. Moreover, jobs are currently also recovered at other 
> places. In all these places we currently would need to release the 
> {{JobGraphs}} if we cannot submit the recovered {{JobGraph}} (e.g. 
> {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the 
> {{SubmittedJobGraphStore}} while the {{Dispatcher}} is not the leader. Then 
> we would have to make sure that no concurrent callback from the 
> {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after 
> revoking leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)