[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-07-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6132


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-30 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r199315500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -984,6 +985,15 @@ private void startCheckpointScheduler(final 
CheckpointCoordinator checkpointCoor
operatorBackPressureStats.orElse(null)));
}
 
+   @Override
+   public void taskManagerTerminated(ResourceID resourceID, 
Set allocationIds, Exception cause) {
--- End diff --

My previous thought was that `RM` needed to notify the `allocationIds` that 
was assigned to `JM`, because it was possible that `SlotManager` had already 
assigned slots to `JM`, but `TM` was killed before `JM` established a 
connection. Mainly to address the issue in 
https://issues.apache.org/jira/browse/FLINK-9351, but with the current approach 
you suggested I think the problem in Flink-9351 has been fixed by the way.


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198910651
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ---
@@ -53,4 +56,13 @@
 * @param cause of the allocation failure
 */
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
Exception cause);
+
+   /**
+* Notifies that the task manager has been terminated.
+* @param jobId to be notified
+* @param resourceID identifying the terminated task manager
+* @param allocationIDs of the job held that belong to this task manager
+* @param cause of the task manager termination.
+*/
+   void notifyTaskManagerTerminated(JobID jobId, ResourceID resourceID, 
Set allocationIDs, Exception cause);
--- End diff --

I think the notification about a terminated `TaskManager` should not come 
from the `SlotManager` but from the `ResourceManager`. Thus, we should not need 
this method.


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491021
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
 * not available (yet).
 */
CompletableFuture 
requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+   /**
+* Notifies that the task manager has terminated.
+*
+* @param resourceID identifying the task manager
+* @param allocationIDs held by this job that belong to the task manager
--- End diff --

I think this parameter is not needed.


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198490490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -984,6 +985,15 @@ private void startCheckpointScheduler(final 
CheckpointCoordinator checkpointCoor
operatorBackPressureStats.orElse(null)));
}
 
+   @Override
+   public void taskManagerTerminated(ResourceID resourceID, 
Set allocationIds, Exception cause) {
--- End diff --

For what do we need the `allocationIds` parameter here?


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491683
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -1120,5 +1131,14 @@ public void reportPayload(ResourceID resourceID, 
Void payload) {
return CompletableFuture.completedFuture(null);
}
}
+
+   protected void notifyTaskManagerCompleted(ResourceID resourceID, 
Exception cause) {
+   WorkerRegistration workerRegistration = 
taskExecutors.remove(resourceID);
+   if (workerRegistration != null) {
+   slotManager.notifyTaskManagerFailed(resourceID, 
workerRegistration.getInstanceID(), cause);
+   } else {
+   log.warn("TaskManager failed before registering with 
ResourceManager successfully.");
--- End diff --

This should be a debug log message.


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198493145
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 ---
@@ -1202,6 +1209,111 @@ public void testSlotRequestFailure() throws 
Exception {
}
}
 
+   /**
+* Tests notify the job manager when the task manager is failed/killed.
+*/
+   @Test
+   public void testNotifyTaskManagerFailed() throws Exception {
+
+   final List, 
Exception>> notifiedTaskManagerInfos = new ArrayList<>();
+
+   try (final SlotManager slotManager = 
createSlotManager(ResourceManagerId.generate(), new TestingResourceActions() {
+   @Override
+   public void notifyTaskManagerTerminated(JobID jobId, 
ResourceID resourceID, Set allocationIDs, Exception cause) {
+   notifiedTaskManagerInfos.add(new 
Tuple4<>(jobId, resourceID, allocationIDs, cause));
+   }
+   })) {
--- End diff --

Indentation looks a bit off here


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198912464
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -717,6 +728,32 @@ private void allocateSlot(TaskManagerSlot 
taskManagerSlot, PendingSlotRequest pe
mainThreadExecutor);
}
 
+   public void notifyTaskManagerFailed(ResourceID resourceID, InstanceID 
instanceID, Exception cause) {
+   final TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceID);
+   if (taskManagerRegistration != null) {
+   final HashMap> 
jobAndAllocationIDMap = new HashMap<>(4);
+   for (SlotID slotID : 
taskManagerRegistration.getSlots()) {
+   TaskManagerSlot taskManagerSlot = 
slots.get(slotID);
+   AllocationID allocationID = 
taskManagerSlot.getAllocationId();
+   if (allocationID != null) {
+   JobID jobId = 
taskManagerSlot.getJobId();
+   Set jobAllocationIDSet = 
jobAndAllocationIDMap.get(jobId);
+   if (jobAllocationIDSet == null) {
+   jobAllocationIDSet = new 
HashSet<>(2);
+   
jobAndAllocationIDMap.put(jobId, jobAllocationIDSet);
+   }
+   jobAllocationIDSet.add(allocationID);
+   }
+   }
+
+   for (Map.Entry> entry : 
jobAndAllocationIDMap.entrySet()) {
+   
resourceActions.notifyTaskManagerTerminated(entry.getKey(), resourceID, 
entry.getValue(), cause);
+   }
+   } else {
+   LOG.warn("TaskManager failed before registering with 
slot manager successfully.");
+   }
--- End diff --

This looks a little bit complicated. Moreover, I don't really like that the 
control flow is: ResourceManager -> SlotManager -> ResourceManager -> 
JobManager.

What about leveraging the existing `ResourceAction#notifyAllocationFailure` 
method. We could say that we not only call this method in case of a failed 
pending slot request but also if we remove a slot. Then unregistering a 
`TaskManager` from the `SlotManager` would remove the slots which then would 
trigger for each allocated slot the `notifyAllocationFailure` message. We would 
then have to introduce a `JobMasterGateway#notifyAllocationFailure` which we 
can call from `ResourceActionsImpl#notifyAllocationFailure`. The implementation 
on the `JobMaster` side would then simply call `SlotPool#failAllocation`.

By doing it that way, we send multiple messages (might not be ideal) but we 
reuse most of the existing code paths without introducing special case logic. 
What do you think?


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491884
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ---
@@ -53,4 +56,13 @@
 * @param cause of the allocation failure
 */
void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
Exception cause);
+
+   /**
+* Notifies that the task manager has been terminated.
--- End diff --

line break is missing here


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198490029
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/TaskManagerSlot.java
 ---
@@ -45,6 +46,9 @@
/** Allocation id for which this slot has been allocated. */
private AllocationID allocationId;
 
+   /** Allocation id for which this slot has been allocated. */
+   private JobID jobId;
--- End diff --

Should be annotated with `@Nullable`


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198489897
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ---
@@ -619,7 +619,11 @@ public void taskTerminated(TaskMonitor.TaskTerminated 
message) {
startNewWorker(launched.profile());
}
 
-   closeTaskManagerConnection(id, new 
Exception(status.getMessage()));
+   final Exception terminatedCause = new 
Exception(status.getMessage());
--- End diff --

let's call it `terminationCause`


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6132#discussion_r198491333
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 ---
@@ -278,4 +279,13 @@ void heartbeatFromTaskManager(
 * not available (yet).
 */
CompletableFuture 
requestOperatorBackPressureStats(JobVertexID jobVertexId);
+
+   /**
+* Notifies that the task manager has terminated.
+*
+* @param resourceID identifying the task manager
+* @param allocationIDs held by this job that belong to the task manager
+* @param cause of the task manager termination
+*/
+   void taskManagerTerminated(ResourceID resourceID, Set 
allocationIDs, Exception cause);
--- End diff --

methods should usually be a verb. What about `notifyTaskManagerTermination`?


---


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-06 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6132

[FLINK-9456][Distributed Coordination]Let ResourceManager notify JobManager 
about failed/killed TaskManagers.

## What is the purpose of the change

*Often, the ResourceManager learns faster about TaskManager 
failures/killings because it directly communicates with the underlying resource 
management framework. Instead of only relying on the JobManager's heartbeat to 
figure out that a TaskManager has died, we should additionally send a signal 
from the ResourceManager to the JobManager if a TaskManager has died. That way, 
we can react faster to TaskManager failures and recover our running job/s.*

## Brief change log

  - *Add `JobMasterGateway#taskManagerTerminated()` to notify the task 
manager terminated and do the disconnection there.*
  - *Let the `ResourceManager` to notify JobMaster when the task manager 
terminated*

## Verifying this change

- once this approach is verified in general, I will add tests for it.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

No

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9456

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6132


commit 652ac037ef3edc75cea0abd4966c2154d6e5fbc0
Author: sihuazhou 
Date:   2018-05-10T06:36:27Z

Let ResourceManager notify JobManager about failed/killed TaskManagers.




---