[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1173#comment-1173
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
Merged


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1174#comment-1174
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm closed the pull request at:

https://github.com/apache/flink/pull/2571


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551911#comment-15551911
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
Rebased to latest `flip-6`.


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551563#comment-15551563
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
Thanks for the comments @KurtYoung. I've updated the PR. No worries, four 
eyes always see more than two and it is natural that ideas progress over time :)


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551557#comment-15551557
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2571#discussion_r82157384
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -136,53 +127,34 @@ public SlotRequestRegistered requestSlot(final 
SlotRequest request) {
// record this allocation in bookkeeping
allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
// remove selected slot from free pool
-   final ResourceSlot removedSlot = 
freeSlots.remove(slot.getSlotId());
-
-   final Future slotRequestReplyFuture =
-   
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-
-   slotRequestReplyFuture.handleAsync(new 
BiFunction() {
-   @Override
-   public Object apply(SlotRequestReply 
slotRequestReply, Throwable throwable) {
-   if (throwable != null) {
-   // we failed, put the slot and 
the request back again
-   if 
(allocationMap.isAllocated(slot.getSlotId())) {
-   // only re-add if the 
slot hasn't been removed in the meantime
-   
freeSlots.put(slot.getSlotId(), removedSlot);
-   }
-   
pendingSlotRequests.put(allocationId, request);
-   }
-   return null;
-   }
-   }, resourceManagerServices.getExecutor());
+   freeSlots.remove(slot.getSlotId());
+
+   sendSlotRequest(slot, request);
} else {
LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
"AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
-   Preconditions.checkState(resourceManagerServices != 
null,
+   Preconditions.checkState(rmServices != null,
"Attempted to allocate resources but no 
ResourceManagerServices set.");
-   
resourceManagerServices.allocateResource(request.getResourceProfile());
+   
rmServices.allocateResource(request.getResourceProfile());
pendingSlotRequests.put(allocationId, request);
}
 
-   return new SlotRequestRegistered(allocationId);
+   return new RMSlotRequestRegistered(allocationId);
}
 
/**
-* Sync slot status with TaskManager's SlotReport.
+* Notifies the SlotManager that a slot is available again after being 
allocated.
+* @param slotID slot id of available slot
 */
-   public void updateSlotStatus(final SlotReport slotReport) {
-   for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-   updateSlotStatus(slotStatus);
+   public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
+   if (!allocationMap.isAllocated(slotID)) {
+   throw new IllegalStateException("Slot was not 
previously allocated but " +
+   "TaskManager reports it as available again");
}
-   }
-
-   /**
-* Registers a TaskExecutor
-* @param resourceID TaskExecutor's ResourceID
-* @param gateway TaskExcutor's gateway
-*/
-   public void registerTaskExecutor(ResourceID resourceID, 
TaskExecutorGateway gateway) {
-   this.taskManagerGateways.put(resourceID, gateway);
+   allocationMap.removeAllocation(slotID);
+   final Map slots = 
registeredSlots.get(resourceID);
+   ResourceSlot freeSlot = slots.get(slotID);
--- End diff --

Actually, this should never happen but I'll introduce a check nevertheless.


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: 

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551547#comment-15551547
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2571#discussion_r82156547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -416,15 +333,14 @@ private boolean isRequestDuplicated(final SlotRequest 
request) {
 * Try to register slot, and tell if this slot is newly registered.
--- End diff --

Updating the description.


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551536#comment-15551536
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2571#discussion_r82155691
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -205,46 +177,64 @@ public void 
handleSlotRequestFailedAtTaskManager(final SlotRequest originalReque
LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
--- End diff --

It can be package-private.


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551532#comment-15551532
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2571#discussion_r82155453
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -118,9 +109,9 @@ public void 
setupResourceManagerServices(ResourceManagerServices resourceManager
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
-* @return SlotRequestRegistered The confirmation message to be send to 
the caller
+* @return RMSlotRequestRegistered The confirmation message to be send 
to the caller
 */
-   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
--- End diff --

+1


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551284#comment-15551284
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2571#discussion_r82135948
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -118,9 +109,9 @@ public void 
setupResourceManagerServices(ResourceManagerServices resourceManager
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
-* @return SlotRequestRegistered The confirmation message to be send to 
the caller
+* @return RMSlotRequestRegistered The confirmation message to be send 
to the caller
 */
-   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
--- End diff --

Maybe we should return SlotRequestRegistered instead of null when the 
request is found duplicated?


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551263#comment-15551263
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2571#discussion_r82134943
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -245,22 +255,26 @@ public TaskExecutorGateway call() throws Exception {
resourceID, 
taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
throw new Exception("Invalid leader 
session id");
}
-   return 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, 
TimeUnit.SECONDS);
+   return 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class)
+   .get(timeout.toMilliseconds(), 
timeout.getUnit());
}
}).handleAsync(new BiFunction() {
@Override
public RegistrationResponse apply(TaskExecutorGateway 
taskExecutorGateway, Throwable throwable) {
if (throwable != null) {
return new 
RegistrationResponse.Decline(throwable.getMessage());
} else {
-   WorkerType oldWorker = 
taskExecutorGateways.remove(resourceID);
-   if (oldWorker != null) {
+   WorkerRegistration oldRegistration = 
taskExecutors.remove(resourceID);
+   if (oldRegistration != null) {
// TODO :: suggest old 
taskExecutor to stop itself

slotManager.notifyTaskManagerFailure(resourceID);
--- End diff --

notifyTaskManagerFailure is called again in 
slotManager.registerTaskExecutor, maybe we should only keep one of these


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of 

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551249#comment-15551249
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2571#discussion_r82134273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -416,15 +333,14 @@ private boolean isRequestDuplicated(final SlotRequest 
request) {
 * Try to register slot, and tell if this slot is newly registered.
--- End diff --

The comment seems to be outdated


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15551229#comment-15551229
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2571#discussion_r82132624
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -136,53 +127,34 @@ public SlotRequestRegistered requestSlot(final 
SlotRequest request) {
// record this allocation in bookkeeping
allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
// remove selected slot from free pool
-   final ResourceSlot removedSlot = 
freeSlots.remove(slot.getSlotId());
-
-   final Future slotRequestReplyFuture =
-   
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-
-   slotRequestReplyFuture.handleAsync(new 
BiFunction() {
-   @Override
-   public Object apply(SlotRequestReply 
slotRequestReply, Throwable throwable) {
-   if (throwable != null) {
-   // we failed, put the slot and 
the request back again
-   if 
(allocationMap.isAllocated(slot.getSlotId())) {
-   // only re-add if the 
slot hasn't been removed in the meantime
-   
freeSlots.put(slot.getSlotId(), removedSlot);
-   }
-   
pendingSlotRequests.put(allocationId, request);
-   }
-   return null;
-   }
-   }, resourceManagerServices.getExecutor());
+   freeSlots.remove(slot.getSlotId());
+
+   sendSlotRequest(slot, request);
} else {
LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
"AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
-   Preconditions.checkState(resourceManagerServices != 
null,
+   Preconditions.checkState(rmServices != null,
"Attempted to allocate resources but no 
ResourceManagerServices set.");
-   
resourceManagerServices.allocateResource(request.getResourceProfile());
+   
rmServices.allocateResource(request.getResourceProfile());
pendingSlotRequests.put(allocationId, request);
}
 
-   return new SlotRequestRegistered(allocationId);
+   return new RMSlotRequestRegistered(allocationId);
}
 
/**
-* Sync slot status with TaskManager's SlotReport.
+* Notifies the SlotManager that a slot is available again after being 
allocated.
+* @param slotID slot id of available slot
 */
-   public void updateSlotStatus(final SlotReport slotReport) {
-   for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-   updateSlotStatus(slotStatus);
+   public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
+   if (!allocationMap.isAllocated(slotID)) {
+   throw new IllegalStateException("Slot was not 
previously allocated but " +
+   "TaskManager reports it as available again");
}
-   }
-
-   /**
-* Registers a TaskExecutor
-* @param resourceID TaskExecutor's ResourceID
-* @param gateway TaskExcutor's gateway
-*/
-   public void registerTaskExecutor(ResourceID resourceID, 
TaskExecutorGateway gateway) {
-   this.taskManagerGateways.put(resourceID, gateway);
+   allocationMap.removeAllocation(slotID);
+   final Map slots = 
registeredSlots.get(resourceID);
+   ResourceSlot freeSlot = slots.get(slotID);
--- End diff --

Better to check whether the slot exists in case of TM report an unknown 
free slot


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548796#comment-15548796
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
As discussed in this PR, I've pushed the following changes:

1. Move the slot registration and allocation report to the registration
of the TaskExecutor

2. Let the TaskExecutor immediately notify the ResourceManager once a
slot becomes free. The ResourceManager has to confirm this
notification. Otherwise, the future slot allocations will be blocked 
because the
ResourceManager's state is not in sync.

3. Change the fencing in handleSlotRequestFailedAtTaskManager to protect
against TaskExecutors which are not registered anymore.


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15544909#comment-15544909
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
@KurtYoung I see, that makes sense. When the TM looses connection to the 
RM, it will eventually be detected by the heartbeating and the list of pending 
slot allocation removal requests will be cleared. TM and RM will re-sync the 
slot status on reconnecting. 


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-10-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15538049#comment-15538049
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2571
  
@mxm Regarding the last heartbeat thing, what i really trying to say is, 
the failing free request will not stuck in the unconfirmed list forever, since 
we have the heartbeat manager to monitoring the network connection between RM 
and TM. If the network is really broker, they will treat each other as dead and 
take some corresponding actions. 


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15536365#comment-15536365
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
Great to hear that we're on the same page :)

>I think it's no need to stick to the failed slot when the allocation fails 
by rpc. Just put it back to the free pool, and give us another shot.

Yes, we can simply trigger processing of pending requests via 
`handleFreeSlot`.

>Actually, i think the pending requests acts like your extra list of 
unconfirmed requests. (And you pointed at last, we actually dont need this list 
as TaskManager will correct our faultd by rejecting allocation).

I think PendingRequests is not the same because it is a list of outstanding 
requests but not requests that have been issued to TaskExecutors. But as we 
found out, we don't need to have a special list for that on the ResourceManager 
side.

>Yes, i also thought this might be a solution. And i think this can work 
with the Heartbeat manager, since if you cannot send the free message to RM, 
you will not be able to send heartbeat too. After some timeout, RM will treat 
the TaskManager as dead, and some garbage collection logic in RM will take care 
all the allocations and slots which belong to this TaskManager. 

Are you saying you would rather let the HeartbeatManager send out the 
removal of slots? That would work but depending on the heartbeat interval this 
could take slightly longer. Semantically, it doesn't make much difference.


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15536220#comment-15536220
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2571
  
@mxm Thanks for your thoughts, i really like the discussion.:smirk:

You just pointed out another point which i missed in the previous reply. 
Actually, i noticed that right after i posted the previous reply, as it turns 
out, my solution was exactly the same as you proposed! 

Some comments inline:

> It is guaranteed that the ResourceManager will receive an RPC response of 
some sort. Either a reply from the TaskExecutor, or a timeout/error which is 
returned by the future. If the request is then retried, the TaskExecutor 
receives the same request twice but will simply acknowledge it again

I think it's no need to stick to the failed slot when the allocation fails 
by rpc. Just put it back to the free pool, and give us another shot. Actually, 
i think the pending requests acts like your extra list of unconfirmed requests. 
(And you pointed out last, we dont need this list indeed as TaskManager will 
correct our fault by rejecting allocation).

>There is one more problem thought. How to prevent a false request from the 
ResourceManager to the TaskExecutor in case the ResourceManager hasn't received 
a reply from the TaskExecutor but the TaskExecutor has already removed the slot 
again

When the allocation fails by rpc and we only have one free slot, it's true 
that we will keep retrying the same slot and keeping failing by rpc. And 
actually the task are finished, the slot becomes free again. Then out request 
reached TaskManager. It's ok for TaskManager to accept the request, at the end, 
JobManager will reject this allocation, and the slot will become free again. 

>Again, the only solution for this problem seems to be to keep a list of 
unconfirmed slot allocation removal requests at the TaskExecutor. 

Yes, i also thought this might be a solution. And i think this can work 
with the Heartbeat manager, since if you cannot send the free message to RM, 
you will not be able to send heartbeat too. After some timeout, RM will treat 
the TaskManager as dead, and some garbage collection logic in RM will take care 
all the allocations and slots which belong to this TaskManager. 

All in all,  I think this version is still much simpler than the first one. 
:smirk:


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for 

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15535918#comment-15535918
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
@KurtYoung It is guaranteed that the ResourceManager will receive an RPC 
response of some sort. Either a reply from the TaskExecutor, or a timeout/error 
which is returned by the future. If the request is then retried, the 
TaskExecutor receives the same request twice but will simply acknowledge it 
again. The ResourceManager just keeps retrying. In the worst case, the 
TaskExecutor has already freed the slot again because the JobManager doesn't 
need it anymore. If the TaskExecutor then reports that the slot is available 
again, we know that we can stop retrying.

This requires us to keep an extra list of unconfirmed requests to the 
TaskExecutor. If the request is still unconfirmed when the slot is free again 
or occupied by a different allocation, we can cancel the retrying and delete 
the unconfirmed request. This is slightly more complicated than I initially 
thought :) 

There is one more problem thought. How to prevent a false request from the 
ResourceManager to the TaskExecutor in case the ResourceManager hasn't received 
a reply from the TaskExecutor but the TaskExecutor has already removed the slot 
again (i.e. task has finished)? The slot would be allocated although it is not 
needed anymore.

Note that sending back a current allocation list when declining a request 
does not cover the case in which a slot has already been released again. The 
TaskExecutor may have tried to decline a request and have failed. In the 
meantime, the ResourceManager sends the same request again. This results in a 
second (duplicate) slot allocation.

Again, the only solution for this problem seems to be to keep a list of 
unconfirmed slot allocation removal requests at the TaskExecutor. The 
ResourceManager has to acknowledge all slot allocation removals. The 
TaskExecutor can then de-duplicate any requests for slots that it hasn't 
received a confirmation for its removal message.

Actually, it should suffice to have only one list with unconfirmed slot 
allocation removals at the TaskExecutor. The ResourceManager doesn't need a 
list to filter because it relies on the TaskExecutor to filter duplicate 
requests correctly.

**TL;DR**
I think we need to change the PR title  Long story short, in addition to 
the proposed previously discussed changes, we need the ResourceManager to 
confirm slot allocation removals by the TaskExecutor. The TaskExecutor has to 
keep around previous allocation ids of freed slots to de-duplicate any old 
incoming slot requests from the ResourceManager.

Thank you so much for your feedback. Please tell me if anything is unclear. 
You're right that the protocol is quite complex.


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15535565#comment-15535565
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2571
  
@mxm I think that situation will happen when RM request slot from TM, TM 
accept the request, but the response was somehow missed. Typically, when an rpc 
error occurred, the send could not know whether the receiver handles that 
request or not. 

The changes looks good to me. 


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15535527#comment-15535527
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
Thanks for the feedback, @beyond1920 and @KurtYoung.

You're right, the changes don't allow slots to be released by a 
TaskExecutor. We can change that by explicitly reporting to the RM if a slot 
becomes free. This may also decrease latency in case a tasks finishes and new 
ones are waiting to be deployed.

>b. When we handleSlotRequestFailedAtTaskManager, we will make this slot 
free again. If the slot is occupied by some other task now, we will 
continuously failed for all allocation on this slot. ( this can be fixed by 3)

How can that happen? The slot will not appear free while it is allocated at 
the TaskExecutor. When allocation fails, it is marked as free and then the 
request is retried immediately. It must succeed eventually if the initial 
decision to allocate the slot was correct. However, we need to explicitly check 
if a TaskExecutor has deregistered, to make sure old TaskExecutors don't send 
failures which triggers slot allocation of already removed slots (due to 
TaskExecutor deregistration). That should be fix with this PR.

> 1. We can remove the update status part entirely, since it can only do 
new slot registration now, we can just move it to the task executor first 
registration.

Very good suggestion. Let's move the initial registration and 
reconciliation of slots to the registration message.

To wrap up, let's change the following:

1. Move the slot registration and allocation report to the registration of 
the TaskExecutor
2. Let the TaskExecutor immediately notify the ResourceManager once a slot 
becomes free
3. Change the fencing in handleSlotRequestFailedAtTaskManager to protect 
against TaskExecutors which are not registered anymore.

Let me know if that would work for you.




> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot 

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15535299#comment-15535299
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user KurtYoung commented on the issue:

https://github.com/apache/flink/pull/2571
  
@mxm Thanks for the simplification, i like the idea. When i wrote the first 
version of the SlotManager, i have noticed maybe i made things too complicated, 
but i didn't figure out how to make things more simplify. 

As it turns out, your modification covered two main problems i have faced:
1. What information to exchange during heartbeats, and what actions should 
we take
2. What action should we take when the allocation failed at TaskManager
But what i really want to find out is: In this case, is there a simple 
paradigm which we can follow to make whole thing clear and robust. What i 
previous choose is: Take actions based on my newest runtime information. But as 
you can see, it leads me to a very complex solution, each time i decide what 
action should be taken, i should to check all related information and consider 
all possibilities. (even it seems hard to understand why that will happen).

Your modification gives me some tips, maybe we can simplify it with 
following ways:
1. RM and TM only exchange information when needed ( so heartbeat dont sync 
status )
2. TM only report informations which it can changed by itself ( like slot 
be free again )

Here is some thoughts about the modification:
1. We can remove the update status part entirely, since it can only do new 
slot registration now, we can just move it to the task executor first 
registration.
2. Once a slot becomes free in TM, notify RM
3. TM should attach the slot usage when rejecting the allocation from RM

Here is some minor problems i found in this modification:
a. As beyond1920 metioned, we dont have a way to find out a slot becomes 
free ( this can be done by 2)
b. When we handleSlotRequestFailedAtTaskManager, we will make this slot 
free again. If the slot is occupied by some other task now, we will 
continuously failed for all allocation on this slot. ( this can be fixed by 3)




> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> 

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15534800#comment-15534800
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2571
  
@mxm, What would happen under following cases: taskExecutor releases a 
registered slot,  then taskExecutor reports its latest slotReport to 
ResourceManager, ResourceManager should remove the old slot allocation from its 
own view and mark the slot free. So I think we should keep the following code 
in the old SlotManager `updateSlotStatus`:
_else {
// slot is reported empty

// check whether we also thought this slot is 
empty
if (allocationMap.isAllocated(slotId)) {
LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
slotId, 
allocationMap.getAllocationID(slotId));

// we thought the slot is in use, 
correct it
allocationMap.removeAllocation(slotId);

// we have a free slot!
handleFreeSlot(slot);
}
}_


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533204#comment-15533204
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2571
  
CC @beyond1920 @KurtYoung 


> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533113#comment-15533113
 ] 

ASF GitHub Bot commented on FLINK-4348:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2571

[FLINK-4348] Simplify logic of SlotManager

This pull request is split up into two commits:

1. It removes some code from the `SlotManager` to make it simpler. 
  - It makes use of `handleSlotRequestFailedAtTaskManager` and simplifies 
it because we can assume that a Slot is only registered once if we talk to the 
same instance of a TaskExecutor. Further, we can omit to check the free slots 
because we previously removed the slot from the free slots.
  - `updateSlotStatus` only deals with new Task slots. All other updates 
are performed by the `ResourceManager`. In case the ResourceManager creashes, 
it will re-register all slot statuses.

2. It fences `TaskExecutor` messages using an `InstanceID` which is 
required to make 1 work correctly. New messages have been introduced to achieve 
that.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flip-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2571.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2571


commit 1d297403e5e085d944d80b498c4a269a794f8e2f
Author: Maximilian Michels 
Date:   2016-09-29T13:08:32Z

[FLINK-4347] change assumptions in SlotManager allocation

- Makes use of `handleSlotRequestFailedAtTaskManager` and simplifies it
because we can assume that a Slot is only registered once if we talk to
the same instance of a TaskExecutor. Further, we can omit to check the
free slots because we previously removed the slot from the free slots.

- `updateSlotStatus` only deals with new Task slots. All other updates
are performed by the `ResourceManager`. In case the ResourceManager
creashes, it will re-register all slot statuses.

commit c6768524c869ecdb84f0a8ae48837afc329c9714
Author: Maximilian Michels 
Date:   2016-09-29T14:03:39Z

[FLINK-4348] discard message from old TaskExecutorGateways

This fences old message using the InstanceID of the TaskExecutor.




> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: Maximilian Michels
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If 

[jira] [Commented] (FLINK-4348) Implement slot allocation protocol with TaskExecutor

2016-09-01 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15455239#comment-15455239
 ] 

Maximilian Michels commented on FLINK-4348:
---

[~jinyu.zj] Could I take this over while you're working on the registration 
jira issues?

> Implement slot allocation protocol with TaskExecutor
> 
>
> Key: FLINK-4348
> URL: https://issues.apache.org/jira/browse/FLINK-4348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Kurt Young
>Assignee: zhangjing
>
> When slotManager finds a proper slot in the free pool for a slot request,  
> slotManager marks the slot as occupied, then tells the taskExecutor to give 
> the slot to the specified JobMaster. 
> when a slot request is sent to taskExecutor, it should contain following 
> parameters: AllocationID, JobID,  slotID, resourceManagerLeaderSessionID. 
> There exists 3 following possibilities of the response from taskExecutor, we 
> will discuss when each possibility happens and how to handle.
> 1. Ack request which means the taskExecutor gives the slot to the specified 
> jobMaster as expected.   
> 2. Decline request if the slot is already occupied by other AllocationID.  
> 3. Timeout which could caused by lost of request message or response message 
> or slow network transfer. 
> On the first occasion, ResourceManager need to do nothing. However, under the 
> second and third occasion, ResourceManager need to notify slotManager, 
> slotManager will verify and clear all the previous allocate information for 
> this slot request firstly, then try to find a proper slot for the slot 
> request again. This may cause some duplicate allocation, e.g. the slot 
> request to TaskManager is successful but the response is lost somehow, so we 
> may request a slot in another TaskManager, this causes two slots assigned to 
> one request, but it can be taken care of by rejecting registration at 
> JobMaster.
> There are still some question need to discuss in a step further.
> 1. Who send slotRequest to taskExecutor, SlotManager or ResourceManager? I 
> think it's better that SlotManager delegates the rpc call to ResourceManager 
> when SlotManager need to communicate with outside world.  ResourceManager 
> know which taskExecutor to send the request based on ResourceID. Besides this 
> RPC call which used to request slot to taskExecutor should not be a 
> RpcMethod,  because we hope only SlotManager has permission to call the 
> method, but the other component, for example JobMaster and TaskExecutor, 
> cannot call this method directly.
> 2. If JobMaster reject the slot offer from a TaskExecutor, the TaskExecutor 
> should notify the free slot to ResourceManager immediately, or wait for next 
> heartbeat sync. The advantage of first way is the resourceManager’s view 
> could be updated faster. The advantage of second way is save a RPC method in 
> ResourceManager.
> 3. There are two communication type. First, the slot request could be sent as 
> an ask operation where the response is returned as a future. Second, 
> resourceManager send the slot request in fire and forget way, the response 
> could be returned by an RPC call. I prefer the first one because it is more 
> simple and could save a RPC method in ResourceManager (for callback in the 
> second way).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)