[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506081#comment-15506081
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm closed the pull request at:

https://github.com/apache/flink/pull/2463


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506080#comment-15506080
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2463
  
Merged to continue development in other places. Thanks for the comments!


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477565#comment-15477565
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2463
  
I've rebased the pull request and incorporated your suggestions.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477560#comment-15477560
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r78212144
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

Thank you for your comments @beyond1920. Your observations are correct. 
I've skipped this part of the implementation and wanted to address it next.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477252#comment-15477252
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r78194763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

These are valid points, I will change the code to use the 
`LeaderRetrievalListener` instead.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473326#comment-15473326
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77970827
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Good point. Will use that one instead.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472686#comment-15472686
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77943543
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

There is a log field in RpcEndpoint, which is protected, why not use that 
instead?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470114#comment-15470114
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77787126
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

The class has some docs but as you can see given my initial question, it's 
purpose was not clear to me.

Yes, I actually thought about marking `leaderSessionID` `volatile`. 

Given the interface of this class every component which has a reference to 
this registry is allowed to change the leader session ID. This can be 
problematic because components other than the `ResourceManager` should only be 
allowed to retrieve the leader session ID.

I'm actually wondering whether it is not necessary to notify the components 
about a new leader session ID. For example, the `SlotManager` should probably 
free its registered slots when it loses the leadership. Wouldn't these calls be 
suitable to transmit the current leader session ID? 


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470065#comment-15470065
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77783497
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

What exactly do you mean? The class is thread-safe and documented (though 
documentation can be improved). There is no need for locking. Do you mean 
marking the leaderSessionID `volatile`? It should be fine if leader changes 
propagate lazily.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469801#comment-15469801
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77769044
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

There exists 3 following possibilities of the response from taskExecutor:
1. Ack request which means the taskExecutor gives the slot to the specified 
jobMaster as expected. 
2. Decline request if the slot is already occupied by other AllocationID. 
3. Timeout which could caused by lost of request message or response 
message or slow network transfer. 
On the first occasion, SlotManager need to do nothing. However, under the 
second and third occasion, slotManager will verify and clear all the previous 
allocate information for this slot request firstly, then try to find a proper 
slot for the slot request again. I thought we should add logic to handle these 
3 following possibilities of the response from taskExecutor.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469776#comment-15469776
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77768256
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ---
@@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway 
resourceManagerGateway) {
 * RPC's main thread to avoid race condition).
 *
 * @param request The detailed request of the slot
+* @return SlotRequestRegistered The confirmation message to be send to 
the caller
 */
-   public void requestSlot(final SlotRequest request) {
+   public SlotRequestRegistered requestSlot(final SlotRequest request) {
+   final AllocationID allocationId = request.getAllocationId();
if (isRequestDuplicated(request)) {
-   LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-   return;
+   LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+   return null;
}
 
// try to fulfil the request with current free slots
-   ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+   final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
if (slot != null) {
LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-   request.getAllocationId(), request.getJobId());
+   allocationId, request.getJobId());
 
// record this allocation in bookkeeping
-   allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+   allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
 
// remove selected slot from free pool
freeSlots.remove(slot.getSlotId());
 
-   // TODO: send slot request to TaskManager
+   slot.getTaskExecutorGateway()
+   .requestSlot(allocationId, 
leaderIdRegistry.getLeaderID());
--- End diff --

ResourceManager keeps a relationship between resourceID and 
TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID 
using ResourceManager here?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467602#comment-15467602
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77650617
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, 

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467612#comment-15467612
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77651399
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

But then this field should probably marked as `protected` instead of 
`private`.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467609#comment-15467609
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77651201
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, 

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467599#comment-15467599
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77650487
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
--- End diff --

Should extend `TestLogger`


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467596#comment-15467596
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77650313
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

Alright, but then this class should be made thread safe and the docs should 
state the purpose.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467390#comment-15467390
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77630918
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467382#comment-15467382
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77630351
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467379#comment-15467379
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r7763
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -32,4 +33,11 @@
// 

 
void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
+
+   /**
+* Send by the ResourceManager to the TaskExecutor
+* @param allocationID id for the request
+* @param resourceManagerLeaderID current leader id of the 
ResourceManager
+*/
+   void requestSlot(AllocationID allocationID, UUID 
resourceManagerLeaderID);
--- End diff --

As of now, this is just a stub but we will have to acknowledge the message. 
Will change the signature to make that clear.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467376#comment-15467376
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77629858
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 ---
@@ -46,13 +47,21 @@
/** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
private final JobID jobID;
 
-   public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-   this(slotID, profiler, null, null);
+   /** Gateway to the TaskManager which reported the SlotStatus */
+   private final TaskExecutorGateway taskExecutorGateway;
--- End diff --

It comes with the SlotReport from the TaskExecutor. Yes, it breaks 
Serializable. Will change the code to contain the String address instead.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467361#comment-15467361
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77629249
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -131,9 +149,16 @@ public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
 * @return Slot assignment
 */
@RpcMethod
-   public SlotAssignment requestSlot(SlotRequest slotRequest) {
-   System.out.println("SlotRequest: " + slotRequest);
-   return new SlotAssignment();
+   public SlotRequestRegistered requestSlot(SlotRequest slotRequest) {
+   final JobID jobId = slotRequest.getJobId();
+   final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+
+   if (jobMasterGateway != null) {
+   return slotManager.requestSlot(slotRequest);
+   } else {
+   LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+   return null;
--- End diff --

The rationale here was to simply ignore this request because the JobManager 
is not registered. You're right, probably better to reply with a meaningful 
answer. 


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467350#comment-15467350
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77628362
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

In order to pass it on to components who want to retrieve the current 
leader UUID. Passing on only a single reference wouldn't work.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467353#comment-15467353
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77628600
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

No particular reason other than I want to make sure future subclasses log 
with the correct class name.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465061#comment-15465061
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2463
  
Thanks for your work @mxm. I've had some comments which you can find 
inline. I think the implementation of the slot request logic made another step 
in the right direction with this PR.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465054#comment-15465054
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524626
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

Why do you create a registry for a single field?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465044#comment-15465044
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524466
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, 

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465037#comment-15465037
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524239
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 ---
@@ -46,13 +47,21 @@
/** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
private final JobID jobID;
 
-   public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-   this(slotID, profiler, null, null);
+   /** Gateway to the TaskManager which reported the SlotStatus */
+   private final TaskExecutorGateway taskExecutorGateway;
--- End diff --

The `SlotStatus` is no longer serializable with this field. Where does the 
`SlotStatus` come from? If it's coming from the `TaskExecutor`, then the 
`taskExecutorGateway` has to be retrieved on the `ResourceManager` side.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465028#comment-15465028
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77523985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -32,4 +33,11 @@
// 

 
void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
+
+   /**
+* Send by the ResourceManager to the TaskExecutor
+* @param allocationID id for the request
+* @param resourceManagerLeaderID current leader id of the 
ResourceManager
+*/
+   void requestSlot(AllocationID allocationID, UUID 
resourceManagerLeaderID);
--- End diff --

How is the confirmation of the `TaskExecutor` sent back to the 
`SlotManager`? Would it make sense to send it back via the return value of this 
method?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465023#comment-15465023
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77523806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, 

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465005#comment-15465005
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77522737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -131,9 +149,16 @@ public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
 * @return Slot assignment
 */
@RpcMethod
-   public SlotAssignment requestSlot(SlotRequest slotRequest) {
-   System.out.println("SlotRequest: " + slotRequest);
-   return new SlotAssignment();
+   public SlotRequestRegistered requestSlot(SlotRequest slotRequest) {
+   final JobID jobId = slotRequest.getJobId();
+   final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+
+   if (jobMasterGateway != null) {
+   return slotManager.requestSlot(slotRequest);
+   } else {
+   LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+   return null;
--- End diff --

Not sure whether we should return `null` here, a negative 
`SlotRequestRegistered` response or throw an exception which will be handled by 
the caller. Why did you choose `null`?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465000#comment-15465000
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77522339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Why not making it static?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458692#comment-15458692
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2463
  
CC @tillrohrmann @StephanEwen 


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458686#comment-15458686
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2463

[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

This implements and tests the ResourceManager part of the protocol for slot 
allocation.

- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot and notify from SlotManager
- adds SlotManager as RM constructor parameter
- adds LeaderIdRetriever to keep track of the leader id

- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink flip-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2463.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2463


commit 213f4ee6a30bd87e9e04c5a4b22022e0636db9e9
Author: Maximilian Michels 
Date:   2016-09-01T14:53:31Z

[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot
- adds SlotManager as RM constructor parameter
- adds LeaderIdRetriever to keep track of the leader id

- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot




> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)