[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529253#comment-16529253 ] ASF GitHub Bot commented on FLINK-9456: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6132 > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16528590#comment-16528590 ] ASF GitHub Bot commented on FLINK-9456: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6132 Hi @tillrohrmann I updated the PR could you please have a look again? > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16528585#comment-16528585 ] ASF GitHub Bot commented on FLINK-9456: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r199315500 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor operatorBackPressureStats.orElse(null))); } + @Override + public void taskManagerTerminated(ResourceID resourceID, Set allocationIds, Exception cause) { --- End diff -- My previous thought was that `RM` needed to notify the `allocationIds` that was assigned to `JM`, because it was possible that `SlotManager` had already assigned slots to `JM`, but `TM` was killed before `JM` established a connection. Mainly to address the issue in https://issues.apache.org/jira/browse/FLINK-9351, but with the current approach you suggested I think the problem in Flink-9351 has been fixed by the way. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527656#comment-16527656 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6132 Thanks a lot @sihuazhou. Ping me once you've updated this PR. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16527026#comment-16527026 ] ASF GitHub Bot commented on FLINK-9456: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6132 @tillrohrmann Thanks for your review and good suggestions, changing the code according to it. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526534#comment-16526534 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491333 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -278,4 +279,13 @@ void heartbeatFromTaskManager( * not available (yet). */ CompletableFuture requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** +* Notifies that the task manager has terminated. +* +* @param resourceID identifying the task manager +* @param allocationIDs held by this job that belong to the task manager +* @param cause of the task manager termination +*/ + void taskManagerTerminated(ResourceID resourceID, Set allocationIDs, Exception cause); --- End diff -- methods should usually be a verb. What about `notifyTaskManagerTermination`? > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526541#comment-16526541 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198490029 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java --- @@ -45,6 +46,9 @@ /** Allocation id for which this slot has been allocated. */ private AllocationID allocationId; + /** Allocation id for which this slot has been allocated. */ + private JobID jobId; --- End diff -- Should be annotated with `@Nullable` > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526537#comment-16526537 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491884 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -53,4 +56,13 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + /** +* Notifies that the task manager has been terminated. --- End diff -- line break is missing here > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526540#comment-16526540 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198489897 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java --- @@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) { startNewWorker(launched.profile()); } - closeTaskManagerConnection(id, new Exception(status.getMessage())); + final Exception terminatedCause = new Exception(status.getMessage()); --- End diff -- let's call it `terminationCause` > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526535#comment-16526535 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491021 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -278,4 +279,13 @@ void heartbeatFromTaskManager( * not available (yet). */ CompletableFuture requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** +* Notifies that the task manager has terminated. +* +* @param resourceID identifying the task manager +* @param allocationIDs held by this job that belong to the task manager --- End diff -- I think this parameter is not needed. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526538#comment-16526538 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198912464 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe mainThreadExecutor); } + public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID instanceID, Exception cause) { + final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); + if (taskManagerRegistration != null) { + final HashMap> jobAndAllocationIDMap = new HashMap<>(4); + for (SlotID slotID : taskManagerRegistration.getSlots()) { + TaskManagerSlot taskManagerSlot = slots.get(slotID); + AllocationID allocationID = taskManagerSlot.getAllocationId(); + if (allocationID != null) { + JobID jobId = taskManagerSlot.getJobId(); + Set jobAllocationIDSet = jobAndAllocationIDMap.get(jobId); + if (jobAllocationIDSet == null) { + jobAllocationIDSet = new HashSet<>(2); + jobAndAllocationIDMap.put(jobId, jobAllocationIDSet); + } + jobAllocationIDSet.add(allocationID); + } + } + + for (Map.Entry> entry : jobAndAllocationIDMap.entrySet()) { + resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, entry.getValue(), cause); + } + } else { + LOG.warn("TaskManager failed before registering with slot manager successfully."); + } --- End diff -- This looks a little bit complicated. Moreover, I don't really like that the control flow is: ResourceManager -> SlotManager -> ResourceManager -> JobManager. What about leveraging the existing `ResourceAction#notifyAllocationFailure` method. We could say that we not only call this method in case of a failed pending slot request but also if we remove a slot. Then unregistering a `TaskManager` from the `SlotManager` would remove the slots which then would trigger for each allocated slot the `notifyAllocationFailure` message. We would then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation on the `JobMaster` side would then simply call `SlotPool#failAllocation`. By doing it that way, we send multiple messages (might not be ideal) but we reuse most of the existing code paths without introducing special case logic. What do you think? > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526536#comment-16526536 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198490490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor operatorBackPressureStats.orElse(null))); } + @Override + public void taskManagerTerminated(ResourceID resourceID, Set allocationIds, Exception cause) { --- End diff -- For what do we need the `allocationIds` parameter here? > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526542#comment-16526542 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491683 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + protected void notifyTaskManagerCompleted(ResourceID resourceID, Exception cause) { + WorkerRegistration workerRegistration = taskExecutors.remove(resourceID); + if (workerRegistration != null) { + slotManager.notifyTaskManagerFailed(resourceID, workerRegistration.getInstanceID(), cause); + } else { + log.warn("TaskManager failed before registering with ResourceManager successfully."); --- End diff -- This should be a debug log message. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526533#comment-16526533 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198910651 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -53,4 +56,13 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + /** +* Notifies that the task manager has been terminated. +* @param jobId to be notified +* @param resourceID identifying the terminated task manager +* @param allocationIDs of the job held that belong to this task manager +* @param cause of the task manager termination. +*/ + void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set allocationIDs, Exception cause); --- End diff -- I think the notification about a terminated `TaskManager` should not come from the `SlotManager` but from the `ResourceManager`. Thus, we should not need this method. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16526539#comment-16526539 ] ASF GitHub Bot commented on FLINK-9456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198493145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java --- @@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws Exception { } } + /** +* Tests notify the job manager when the task manager is failed/killed. +*/ + @Test + public void testNotifyTaskManagerFailed() throws Exception { + + final List, Exception>> notifiedTaskManagerInfos = new ArrayList<>(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() { + @Override + public void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set allocationIDs, Exception cause) { + notifiedTaskManagerInfos.add(new Tuple4<>(jobId, resourceID, allocationIDs, cause)); + } + })) { --- End diff -- Indentation looks a bit off here > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504382#comment-16504382 ] ASF GitHub Bot commented on FLINK-9456: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6132 the failure on travis is unrelated. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504290#comment-16504290 ] ASF GitHub Bot commented on FLINK-9456: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6132 cc @tillrohrmann > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504268#comment-16504268 ] ASF GitHub Bot commented on FLINK-9456: --- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6132 [FLINK-9456][Distributed Coordination]Let ResourceManager notify JobManager about failed/killed TaskManagers. ## What is the purpose of the change *Often, the ResourceManager learns faster about TaskManager failures/killings because it directly communicates with the underlying resource management framework. Instead of only relying on the JobManager's heartbeat to figure out that a TaskManager has died, we should additionally send a signal from the ResourceManager to the JobManager if a TaskManager has died. That way, we can react faster to TaskManager failures and recover our running job/s.* ## Brief change log - *Add `JobMasterGateway#taskManagerTerminated()` to notify the task manager terminated and do the disconnection there.* - *Let the `ResourceManager` to notify JobMaster when the task manager terminated* ## Verifying this change - once this approach is verified in general, I will add tests for it. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9456 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6132 commit 652ac037ef3edc75cea0abd4966c2154d6e5fbc0 Author: sihuazhou Date: 2018-05-10T06:36:27Z Let ResourceManager notify JobManager about failed/killed TaskManagers. > Let ResourceManager notify JobManager about failed/killed TaskManagers > -- > > Key: FLINK-9456 > URL: https://issues.apache.org/jira/browse/FLINK-9456 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Sihua Zhou >Priority: Major > Fix For: 1.6.0, 1.5.1 > > > Often, the {{ResourceManager}} learns faster about TaskManager > failures/killings because it directly communicates with the underlying > resource management framework. Instead of only relying on the > {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we > should additionally send a signal from the {{ResourceManager}} to the > {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster > to {{TaskManager}} failures and recover our running job/s. -- This message was sent by Atlassian JIRA (v7.6.3#76005)