[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6132 ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r199315500 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor operatorBackPressureStats.orElse(null))); } + @Override + public void taskManagerTerminated(ResourceID resourceID, Set allocationIds, Exception cause) { --- End diff -- My previous thought was that `RM` needed to notify the `allocationIds` that was assigned to `JM`, because it was possible that `SlotManager` had already assigned slots to `JM`, but `TM` was killed before `JM` established a connection. Mainly to address the issue in https://issues.apache.org/jira/browse/FLINK-9351, but with the current approach you suggested I think the problem in Flink-9351 has been fixed by the way. ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198910651 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -53,4 +56,13 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + /** +* Notifies that the task manager has been terminated. +* @param jobId to be notified +* @param resourceID identifying the terminated task manager +* @param allocationIDs of the job held that belong to this task manager +* @param cause of the task manager termination. +*/ + void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set allocationIDs, Exception cause); --- End diff -- I think the notification about a terminated `TaskManager` should not come from the `SlotManager` but from the `ResourceManager`. Thus, we should not need this method. ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491021 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -278,4 +279,13 @@ void heartbeatFromTaskManager( * not available (yet). */ CompletableFuture requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** +* Notifies that the task manager has terminated. +* +* @param resourceID identifying the task manager +* @param allocationIDs held by this job that belong to the task manager --- End diff -- I think this parameter is not needed. ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198490490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -984,6 +985,15 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor operatorBackPressureStats.orElse(null))); } + @Override + public void taskManagerTerminated(ResourceID resourceID, Set allocationIds, Exception cause) { --- End diff -- For what do we need the `allocationIds` parameter here? ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491683 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, Void payload) { return CompletableFuture.completedFuture(null); } } + + protected void notifyTaskManagerCompleted(ResourceID resourceID, Exception cause) { + WorkerRegistration workerRegistration = taskExecutors.remove(resourceID); + if (workerRegistration != null) { + slotManager.notifyTaskManagerFailed(resourceID, workerRegistration.getInstanceID(), cause); + } else { + log.warn("TaskManager failed before registering with ResourceManager successfully."); --- End diff -- This should be a debug log message. ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198493145 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java --- @@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws Exception { } } + /** +* Tests notify the job manager when the task manager is failed/killed. +*/ + @Test + public void testNotifyTaskManagerFailed() throws Exception { + + final List, Exception>> notifiedTaskManagerInfos = new ArrayList<>(); + + try (final SlotManager slotManager = createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() { + @Override + public void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, Set allocationIDs, Exception cause) { + notifiedTaskManagerInfos.add(new Tuple4<>(jobId, resourceID, allocationIDs, cause)); + } + })) { --- End diff -- Indentation looks a bit off here ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198912464 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pe mainThreadExecutor); } + public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID instanceID, Exception cause) { + final TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID); + if (taskManagerRegistration != null) { + final HashMap> jobAndAllocationIDMap = new HashMap<>(4); + for (SlotID slotID : taskManagerRegistration.getSlots()) { + TaskManagerSlot taskManagerSlot = slots.get(slotID); + AllocationID allocationID = taskManagerSlot.getAllocationId(); + if (allocationID != null) { + JobID jobId = taskManagerSlot.getJobId(); + Set jobAllocationIDSet = jobAndAllocationIDMap.get(jobId); + if (jobAllocationIDSet == null) { + jobAllocationIDSet = new HashSet<>(2); + jobAndAllocationIDMap.put(jobId, jobAllocationIDSet); + } + jobAllocationIDSet.add(allocationID); + } + } + + for (Map.Entry> entry : jobAndAllocationIDMap.entrySet()) { + resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, entry.getValue(), cause); + } + } else { + LOG.warn("TaskManager failed before registering with slot manager successfully."); + } --- End diff -- This looks a little bit complicated. Moreover, I don't really like that the control flow is: ResourceManager -> SlotManager -> ResourceManager -> JobManager. What about leveraging the existing `ResourceAction#notifyAllocationFailure` method. We could say that we not only call this method in case of a failed pending slot request but also if we remove a slot. Then unregistering a `TaskManager` from the `SlotManager` would remove the slots which then would trigger for each allocated slot the `notifyAllocationFailure` message. We would then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation on the `JobMaster` side would then simply call `SlotPool#failAllocation`. By doing it that way, we send multiple messages (might not be ideal) but we reuse most of the existing code paths without introducing special case logic. What do you think? ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491884 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java --- @@ -53,4 +56,13 @@ * @param cause of the allocation failure */ void notifyAllocationFailure(JobID jobId, AllocationID allocationId, Exception cause); + + /** +* Notifies that the task manager has been terminated. --- End diff -- line break is missing here ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198490029 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java --- @@ -45,6 +46,9 @@ /** Allocation id for which this slot has been allocated. */ private AllocationID allocationId; + /** Allocation id for which this slot has been allocated. */ + private JobID jobId; --- End diff -- Should be annotated with `@Nullable` ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198489897 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java --- @@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated message) { startNewWorker(launched.profile()); } - closeTaskManagerConnection(id, new Exception(status.getMessage())); + final Exception terminatedCause = new Exception(status.getMessage()); --- End diff -- let's call it `terminationCause` ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6132#discussion_r198491333 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java --- @@ -278,4 +279,13 @@ void heartbeatFromTaskManager( * not available (yet). */ CompletableFuture requestOperatorBackPressureStats(JobVertexID jobVertexId); + + /** +* Notifies that the task manager has terminated. +* +* @param resourceID identifying the task manager +* @param allocationIDs held by this job that belong to the task manager +* @param cause of the task manager termination +*/ + void taskManagerTerminated(ResourceID resourceID, Set allocationIDs, Exception cause); --- End diff -- methods should usually be a verb. What about `notifyTaskManagerTermination`? ---
[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/6132 [FLINK-9456][Distributed Coordination]Let ResourceManager notify JobManager about failed/killed TaskManagers. ## What is the purpose of the change *Often, the ResourceManager learns faster about TaskManager failures/killings because it directly communicates with the underlying resource management framework. Instead of only relying on the JobManager's heartbeat to figure out that a TaskManager has died, we should additionally send a signal from the ResourceManager to the JobManager if a TaskManager has died. That way, we can react faster to TaskManager failures and recover our running job/s.* ## Brief change log - *Add `JobMasterGateway#taskManagerTerminated()` to notify the task manager terminated and do the disconnection there.* - *Let the `ResourceManager` to notify JobMaster when the task manager terminated* ## Verifying this change - once this approach is verified in general, I will add tests for it. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation No You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9456 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6132 commit 652ac037ef3edc75cea0abd4966c2154d6e5fbc0 Author: sihuazhou Date: 2018-05-10T06:36:27Z Let ResourceManager notify JobManager about failed/killed TaskManagers. ---