[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506081#comment-15506081 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm closed the pull request at: https://github.com/apache/flink/pull/2463 > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506080#comment-15506080 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2463 Merged to continue development in other places. Thanks for the comments! > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477565#comment-15477565 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2463 I've rebased the pull request and incorporated your suggestions. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477560#comment-15477560 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r78212144 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- Thank you for your comments @beyond1920. Your observations are correct. I've skipped this part of the implementation and wanted to address it next. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477252#comment-15477252 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r78194763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- These are valid points, I will change the code to use the `LeaderRetrievalListener` instead. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15473326#comment-15473326 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77970827 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- Good point. Will use that one instead. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15472686#comment-15472686 ] ASF GitHub Bot commented on FLINK-4538: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77943543 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- There is a log field in RpcEndpoint, which is protected, why not use that instead? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470114#comment-15470114 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77787126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- The class has some docs but as you can see given my initial question, it's purpose was not clear to me. Yes, I actually thought about marking `leaderSessionID` `volatile`. Given the interface of this class every component which has a reference to this registry is allowed to change the leader session ID. This can be problematic because components other than the `ResourceManager` should only be allowed to retrieve the leader session ID. I'm actually wondering whether it is not necessary to notify the components about a new leader session ID. For example, the `SlotManager` should probably free its registered slots when it loses the leadership. Wouldn't these calls be suitable to transmit the current leader session ID? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15470065#comment-15470065 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77783497 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- What exactly do you mean? The class is thread-safe and documented (though documentation can be improved). There is no need for locking. Do you mean marking the leaderSessionID `volatile`? It should be fine if leader changes propagate lazily. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469801#comment-15469801 ] ASF GitHub Bot commented on FLINK-4538: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77769044 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- There exists 3 following possibilities of the response from taskExecutor: 1. Ack request which means the taskExecutor gives the slot to the specified jobMaster as expected. 2. Decline request if the slot is already occupied by other AllocationID. 3. Timeout which could caused by lost of request message or response message or slow network transfer. On the first occasion, SlotManager need to do nothing. However, under the second and third occasion, slotManager will verify and clear all the previous allocate information for this slot request firstly, then try to find a proper slot for the slot request again. I thought we should add logic to handle these 3 following possibilities of the response from taskExecutor. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15469776#comment-15469776 ] ASF GitHub Bot commented on FLINK-4538: --- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77768256 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -89,32 +93,37 @@ public SlotManager(ResourceManagerGateway resourceManagerGateway) { * RPC's main thread to avoid race condition). * * @param request The detailed request of the slot +* @return SlotRequestRegistered The confirmation message to be send to the caller */ - public void requestSlot(final SlotRequest request) { + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; } // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); if (slot != null) { LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); + allocationId, request.getJobId()); // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); + allocationMap.addAllocation(slot.getSlotId(), allocationId); // remove selected slot from free pool freeSlots.remove(slot.getSlotId()); - // TODO: send slot request to TaskManager + slot.getTaskExecutorGateway() + .requestSlot(allocationId, leaderIdRegistry.getLeaderID()); --- End diff -- ResourceManager keeps a relationship between resourceID and TaskExecutorGateway. Maybe we could fetch TaskExecutorGateway by resourceID using ResourceManager here? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467602#comment-15467602 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77650617 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress,
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467612#comment-15467612 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77651399 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- But then this field should probably marked as `protected` instead of `private`. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467609#comment-15467609 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77651201 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress,
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467599#comment-15467599 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77650487 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { --- End diff -- Should extend `TestLogger` > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467596#comment-15467596 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77650313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- Alright, but then this class should be made thread safe and the docs should state the purpose. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467390#comment-15467390 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77630918 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467382#comment-15467382 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77630351 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID));
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467379#comment-15467379 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r7763 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -32,4 +33,11 @@ // void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); + + /** +* Send by the ResourceManager to the TaskExecutor +* @param allocationID id for the request +* @param resourceManagerLeaderID current leader id of the ResourceManager +*/ + void requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID); --- End diff -- As of now, this is just a stub but we will have to acknowledge the message. Will change the signature to make that clear. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467376#comment-15467376 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77629858 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java --- @@ -46,13 +47,21 @@ /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ private final JobID jobID; - public SlotStatus(SlotID slotID, ResourceProfile profiler) { - this(slotID, profiler, null, null); + /** Gateway to the TaskManager which reported the SlotStatus */ + private final TaskExecutorGateway taskExecutorGateway; --- End diff -- It comes with the SlotReport from the TaskExecutor. Yes, it breaks Serializable. Will change the code to contain the String address instead. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467361#comment-15467361 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77629249 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -131,9 +149,16 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public SlotRequestRegistered requestSlot(SlotRequest slotRequest) { + final JobID jobId = slotRequest.getJobId(); + final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + + if (jobMasterGateway != null) { + return slotManager.requestSlot(slotRequest); + } else { + LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + return null; --- End diff -- The rationale here was to simply ignore this request because the JobManager is not registered. You're right, probably better to reply with a meaningful answer. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467350#comment-15467350 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77628362 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- In order to pass it on to components who want to retrieve the current leader UUID. Passing on only a single reference wouldn't work. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467353#comment-15467353 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77628600 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- No particular reason other than I want to make sure future subclasses log with the correct class name. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465061#comment-15465061 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2463 Thanks for your work @mxm. I've had some comments which you can find inline. I think the implementation of the slot request logic made another step in the right direction with this PR. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465054#comment-15465054 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524626 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- Why do you create a registry for a single field? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465044#comment-15465044 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524466 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress,
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465037#comment-15465037 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524239 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java --- @@ -46,13 +47,21 @@ /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ private final JobID jobID; - public SlotStatus(SlotID slotID, ResourceProfile profiler) { - this(slotID, profiler, null, null); + /** Gateway to the TaskManager which reported the SlotStatus */ + private final TaskExecutorGateway taskExecutorGateway; --- End diff -- The `SlotStatus` is no longer serializable with this field. Where does the `SlotStatus` come from? If it's coming from the `TaskExecutor`, then the `taskExecutorGateway` has to be retrieved on the `ResourceManager` side. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465028#comment-15465028 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77523985 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -32,4 +33,11 @@ // void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); + + /** +* Send by the ResourceManager to the TaskExecutor +* @param allocationID id for the request +* @param resourceManagerLeaderID current leader id of the ResourceManager +*/ + void requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID); --- End diff -- How is the confirmation of the `TaskExecutor` sent back to the `SlotManager`? Would it make sense to send it back via the return value of this method? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465023#comment-15465023 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77523806 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress,
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465005#comment-15465005 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77522737 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -131,9 +149,16 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public SlotRequestRegistered requestSlot(SlotRequest slotRequest) { + final JobID jobId = slotRequest.getJobId(); + final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + + if (jobMasterGateway != null) { + return slotManager.requestSlot(slotRequest); + } else { + LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + return null; --- End diff -- Not sure whether we should return `null` here, a negative `SlotRequestRegistered` response or throw an exception which will be handled by the caller. Why did you choose `null`? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465000#comment-15465000 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77522339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- Why not making it static? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458692#comment-15458692 ] ASF GitHub Bot commented on FLINK-4538: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2463 CC @tillrohrmann @StephanEwen > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458686#comment-15458686 ] ASF GitHub Bot commented on FLINK-4538: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2463 [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol This implements and tests the ResourceManager part of the protocol for slot allocation. - associates JobMasters with JobID instead of InstanceID - adds TaskExecutorGateway to slot and notify from SlotManager - adds SlotManager as RM constructor parameter - adds LeaderIdRetriever to keep track of the leader id - tests the interaction JM->RM requestSlot - tests the interaction RM->TM requestSlot You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink flip-6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2463.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2463 commit 213f4ee6a30bd87e9e04c5a4b22022e0636db9e9 Author: Maximilian MichelsDate: 2016-09-01T14:53:31Z [FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol - associates JobMasters with JobID instead of InstanceID - adds TaskExecutorGateway to slot - adds SlotManager as RM constructor parameter - adds LeaderIdRetriever to keep track of the leader id - tests the interaction JM->RM requestSlot - tests the interaction RM->TM requestSlot > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)