[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907507#comment-15907507 ] ASF GitHub Bot commented on FLINK-5971: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3488 > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.3.0 > > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15907502#comment-15907502 ] ASF GitHub Bot commented on FLINK-5971: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3488 Merging this PR. > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15905211#comment-15905211 ] ASF GitHub Bot commented on FLINK-5971: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3488 +1 to merge. > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902785#comment-15902785 ] ASF GitHub Bot commented on FLINK-5971: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3488 Thanks a lot for your review @KurtYoung. I've addressed you comments and updated the PR. > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902757#comment-15902757 ] ASF GitHub Bot commented on FLINK-5971: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105118398 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java --- @@ -283,5 +302,31 @@ public void handleError(Exception exception) { JobLeaderIdListener.class.getSimpleName(), exception); } } + + private void activateTimeout() { + if (timeoutId != null) { + cancelTimeout(); + } + + final UUID newTimeoutId = UUID.randomUUID(); + + timeoutId = newTimeoutId; + + timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId); --- End diff -- Yes I think you're right. Good point. Will fix the problem. > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902753#comment-15902753 ] ASF GitHub Bot commented on FLINK-5971: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105117904 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of configuration options relating to the ResourceManager + */ +@PublicEvolving +public class ResourceManagerOptions { + + public static final ConfigOption JOB_TIMEOUT = ConfigOptions + .key("resourcemanager.job.timeout") --- End diff -- Hmm not sure whether it's clear to the user what an inactive job is. Will add java docs, though. > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902742#comment-15902742 ] ASF GitHub Bot commented on FLINK-5971: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105117165 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java --- @@ -28,6 +28,10 @@ @PublicEvolving public class AkkaOptions { + public static final ConfigOption AKKA_ASK_TIMEOUT = ConfigOptions --- End diff -- True will add it. > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902280#comment-15902280 ] ASF GitHub Bot commented on FLINK-5971: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105068133 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java --- @@ -28,6 +28,10 @@ @PublicEvolving public class AkkaOptions { + public static final ConfigOption AKKA_ASK_TIMEOUT = ConfigOptions --- End diff -- Maybe add a comment to be consistency with other options in this class? > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902279#comment-15902279 ] ASF GitHub Bot commented on FLINK-5971: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105068068 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of configuration options relating to the ResourceManager + */ +@PublicEvolving +public class ResourceManagerOptions { + + public static final ConfigOption JOB_TIMEOUT = ConfigOptions + .key("resourcemanager.job.timeout") --- End diff -- can we have a more specific name like "inactive_job.timeout". It looks like normal job will also timeout for 5 minutes. > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902273#comment-15902273 ] ASF GitHub Bot commented on FLINK-5971: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r105067829 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java --- @@ -283,5 +302,31 @@ public void handleError(Exception exception) { JobLeaderIdListener.class.getSimpleName(), exception); } } + + private void activateTimeout() { + if (timeoutId != null) { + cancelTimeout(); + } + + final UUID newTimeoutId = UUID.randomUUID(); + + timeoutId = newTimeoutId; + + timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId); --- End diff -- I think this thread is different with the leader retrieve thread which will call "notifyLeaderAddress", right? So is this possible that when a job is close to timeout, and then comes the notification of leader address, both threads are trying to `cancelTimeout`, NPE will be thrown if one of them trying to call `timeoutFuture.cancel(true)`, while the other one has set `timeoutFuture = null` > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901398#comment-15901398 ] ASF GitHub Bot commented on FLINK-5971: --- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3488#discussion_r104937304 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java --- @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * The set of configuration options relating to the ResourceManager + */ +@PublicEvolving +public class ResourceManagerOptions { + + public static final ConfigOption JOB_TIMEOUT = ConfigOptions + .key("resourcemanager.job.timeout") --- End diff -- maybe we can change the name to something like "inactive_job.timeout" or "idle_job.timeout" to make this config more specific > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5971) JobLeaderIdService should time out registered jobs
[ https://issues.apache.org/jira/browse/FLINK-5971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15899461#comment-15899461 ] ASF GitHub Bot commented on FLINK-5971: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3488 [FLINK-5971] [flip-6] Add timeout for registered jobs on the ResourceManager This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive if there is no active leader known for this job. In case that a job times out, it will be removed from the ResourceManager. Additionally, this PR removes the dependency of the JobLeaderIdService on the RunningJobsRegistry. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink jobLeaderIdServiceTimeoutJobs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3488 commit 6fb0e239921ca10bac218d62155b08fb96e3725d Author: Till RohrmannDate: 2017-03-06T15:57:43Z [FLINK-5971] [flip-6] Add timeout for registered jobs on the ResourceManager This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive if there is no active leader known for this job. In case that a job times out, it will be removed from the ResourceManager. Additionally, this PR removes the dependency of the JobLeaderIdService on the RunningJobsRegistry. > JobLeaderIdService should time out registered jobs > -- > > Key: FLINK-5971 > URL: https://issues.apache.org/jira/browse/FLINK-5971 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{JobLeaderIdService}} has no mechanism to time out inactive jobs. At the > moment it relies on the {{RunningJobsRegistry}} which only gives a heuristic > answer. > We should remove the {{RunningJobsRegistry}} and register instead a timeout > for each job which does not have a job leader associated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)