[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712632#comment-16712632 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on issue #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#issuecomment-445192951 @Clark background is that it will make things easier and otherwise you have concurrency between two components that want to interact in transactional ways: if the scheduler runs in a different thread than the slot pool there can be concurent modifications to the slot pool (e.g. slots added/removed) between the scheduler asking for the available slots and the scheduler requesting the available slot. All of this has to be resolved and it becomes harder to understand and reason about the code. This can be avoided if scheduler and slot pool run in the same thread, and we are also aiming at having all modifications to the execution graph in the same thread as well. The threading model would then be that blocking or expensive operations run in their own thread so that the main thread is never blocked, but the results are always synced back to a main thread to runs all the modifications in scheduler, slot pool, execution graph, etc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704330#comment-16704330 ] ASF GitHub Bot commented on FLINK-10431: Clark commented on issue #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#issuecomment-443109202 Hi @StefanRRichter , I am just wondering why make SlotPool no longer to be an RpcEndpoint? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663665#comment-16663665 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228148052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ## @@ -312,7 +313,7 @@ public void validateRunsInMainThread() { /** * Executor which executes runnables in the main thread context. */ - protected static class MainThreadExecutor implements Executor { + protected static class MainThreadExecutor implements MainThreadExecutable { Review comment: I was also thinking about that. Are you suggesting to replace the current methods by the ones from `ScheduledExecutor` or to add them on top of the existing methods? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663662#comment-16663662 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228147167 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( Review comment: In general, I like the idea, but I wonder how this is easily possible? For this methods, I agree but the interface also has `requestNewAllocatedSlot` which also exposes `AllocatedSlot` and typically is followed by attempts to assign payload. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663661#comment-16663661 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228147167 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( Review comment: In general, I like the idea, but I wonder how this is easily possible? For this methods, I agree but the interface also has `requestNewAllocatedSlot` which also exposes then interface and typically is followed by attempts to assign payload. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663506#comment-16663506 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228105187 ## File path: flink-tests/src/test/resources/log4j-test.properties ## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=DEBUG, testlogger Review comment: Please revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663512#comment-16663512 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228103506 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java ## @@ -315,6 +315,11 @@ public int getPhysicalSlotNumber() { return 0; } + @Override + public ResourceProfile getResourceProfile() { + throw new UnsupportedOperationException("Not implemented"); Review comment: Could the return value be `ResourceProfile#UNKNOWN`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663507#comment-16663507 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228104451 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ## @@ -404,7 +392,7 @@ public void testReleaseResource() throws Exception { */ @Test public void testSlotRequestCancellationUponFailingRequest() throws Exception { - final SlotPool slotPool = new SlotPool(rpcService, jobId, LocationPreferenceSchedulingStrategy.getInstance()); + final SlotPool slotPool = new SlotPool( jobId, LocationPreferenceSchedulingStrategy.getInstance()); Review comment: whitespace after `(` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663518#comment-16663518 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228104159 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java ## @@ -1,428 +1,428 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster.slotpool; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.SlotProfile; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit; -import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; -import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.JobMasterId; -import org.apache.flink.runtime.jobmaster.LogicalSlot; -import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.RpcUtils; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.util.clock.Clock; -import org.apache.flink.runtime.util.clock.SystemClock; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorSystem; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import javax.annotation.Nullable; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; -import java.util.function.Consumer; - -import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for the SlotPool using a proper RPC setup. - */ -public class SlotPoolRpcTest extends TestLogger { - - private static RpcService rpcService; - - private static final Time timeout = Time.seconds(10L); - - private static final Time fastTimeout = Time.milliseconds(1L); - - // - // setup - // - - @BeforeClass - public static void setup() { - ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - rpcService = new AkkaRpcService(actorSystem, Time.seconds(10)); - } - - @AfterClass - public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { - if (rpcService != null) { - RpcUtils.terminateRpcService(rpcService, timeout); - rpcService = null; - } -
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663511#comment-16663511 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228101887 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java ## @@ -107,6 +107,7 @@ public void testSlotReleaseOnFailedResourceAssignment() throws Exception { slotProvider, false, LocationPreferenceConstraint.ALL, + null, Review comment: I think we need to update this test and pass in `Collections.emptySet()`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663513#comment-16663513 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228102058 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ## @@ -71,7 +71,7 @@ public void testSlotReleasedWhenScheduledImmediately() { assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot - vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL); + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, null); Review comment: `Collections.emptySet()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663517#comment-16663517 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228102608 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java ## @@ -111,7 +112,8 @@ public void testScheduleImmediately() throws Exception { assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots()); } - + + @Ignore Review comment: Why do we ignore this test? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663509#comment-16663509 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228102113 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java ## @@ -157,7 +157,7 @@ public void testMultiRegionsFailover() throws Exception { assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); - ev21.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL); + ev21.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL, null); Review comment: `Collections.emptySet()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663508#comment-16663508 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228105242 ## File path: flink-tests/src/test/resources/log4j-test.properties ## @@ -29,3 +29,4 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n # suppress the irrelevant (wrong) warnings from the netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR log4j.logger.org.apache.zookeeper=OFF +log4j.logger.org.apache.flink.runtime.jobmaster.slotpool=DEBUG Review comment: Please revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663514#comment-16663514 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228101983 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ## @@ -454,7 +454,7 @@ public void testScheduleOrDeployAfterCancel() { // it can occur as the result of races { Scheduler scheduler = mock(Scheduler.class); - vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL); + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL, null); Review comment: Same here with the test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663515#comment-16663515 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228104545 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestMainThreadExecutor.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rpc.MainThreadExecutable; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TestMainThreadExecutor implements MainThreadExecutable { Review comment: JavaDocs missing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663510#comment-16663510 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228102692 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java ## @@ -817,7 +818,8 @@ public void testSequentialAllocateAndRelease() { fail(e.getMessage()); } } - + + @Ignore Review comment: Why `@Ignore`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663516#comment-16663516 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228105123 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestMainThreadExecutor.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rpc.MainThreadExecutable; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TestMainThreadExecutor implements MainThreadExecutable { + + private final ScheduledExecutorService scheduledExecutor; + + public TestMainThreadExecutor(ScheduledExecutorService scheduledExecutor) { + this.scheduledExecutor = scheduledExecutor; + } + + public TestMainThreadExecutor() { + this(Executors.newSingleThreadScheduledExecutor()); Review comment: Shouldn't this class be an `ExecutorService` to close the created `Executor` properly? Otherwise we might run into problems when running tests using this `TestMainThreadExecutor` in a loop, for example. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663489#comment-16663489 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228100910 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: Stefan and me had an offline discussion and we concluded to remove now the porting scaffolds because they are no longer needed. The `Scheduler` should now be responsible for the `LogicalSlot` allocation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663487#comment-16663487 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228100255 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); Review comment: Why not changing the return type of `SlotProfile#getPreferredAllocations` into `Set`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663400#comment-16663400 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r228066391 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: In the end yes, absolutely right. But this is currently on purpose together with some of your other comments that there is still slot pool code around that could be removed. The purpose is you can switch back and forth now between the new and the old code path by changing a single line in the job master and to make the change as incremental as possible, as we previously discussed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662539#comment-16662539 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227849935 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: Shouldn't this method be only called by the new `Scheduler` which already wraps all external calls into the main thread? If yes, then we don't need to do it here again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662544#comment-16662544 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872252 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. Review comment: What's preventing us from removing this method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662540#comment-16662540 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227848647 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -324,78 +297,98 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); - - final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); - - if (slotSharingGroupId != null) { - // allocate slot with slot sharing - final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( - slotSharingGroupId, - id -> new SlotSharingManager( - id, - this, - providerAndOwner)); - - final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality; - - try { - if (task.getCoLocationConstraint() != null) { - multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot( - task.getCoLocationConstraint(), - multiTaskSlotManager, - slotProfile, - allowQueuedScheduling, - allocationTimeout); + return CompletableFuture.completedFuture(null).thenComposeAsync((i) -> { Review comment: `CompletableFuture.supplyAsync()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662546#comment-16662546 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227877459 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ## @@ -312,7 +313,7 @@ public void validateRunsInMainThread() { /** * Executor which executes runnables in the main thread context. */ - protected static class MainThreadExecutor implements Executor { + protected static class MainThreadExecutor implements MainThreadExecutable { Review comment: `MainThreadExecutable` was actually intended to be a marker interface for the `RpcServer`. I would recommend to let the `MainThreadExecutor` implement the `ScheduledExecutor`. That way, there is no need to change the `MainThreadExecutable`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662547#comment-16662547 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227870899 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -822,6 +835,29 @@ private SlotAndLocality pollAndAllocateSlot(SlotRequestId slotRequestId, SlotPro return slotFromPool; } + @Nullable + private AllocatedSlot allocateSlotWithID(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) { + AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); + if (allocatedSlot != null) { + allocatedSlots.add(slotRequestId, allocatedSlot); + } + return allocatedSlot; + } + + @Override + @Nullable + public AllocatedSlot allocateAvailableSlot( Review comment: Could think of whether to make it an `Optional` to make it more explicit that this might not succeed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662542#comment-16662542 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872881 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( + @Nonnull SlotRequestId slotRequestId, + @Nonnull AllocationID allocationID); + + /** +* Request the allocation of a new slot from the resource manager. This method will not return a slot from the +* already available slots from the pool, but instead will add a new slot to that pool that is immediately allocated +* and returned. +* +* @param slotRequestId identifying the requested slot +* @param resourceProfile resource profile that specifies the resource requirements for the requested slot +* @param timeout timeout for the allocation procedure +* @return a newly allocated slot that was previously not available. +*/ + @Nonnull + CompletableFuture requestNewAllocatedSlot( Review comment: Same here with the `AllocatedSlot` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662548#comment-16662548 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872948 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java ## @@ -90,7 +90,7 @@ * @param cause of the cancellation * @return Future which is completed once the slot request has been cancelled */ - CompletableFuture cancelSlotRequest( + Acknowledge cancelSlotRequest( Review comment: Can this become `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662543#comment-16662543 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227871624 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ## @@ -136,12 +143,13 @@ private String jobManagerAddress; + private MainThreadExecutable jmMainThreadScheduledExecutor; + // @VisibleForTesting - protected SlotPool(RpcService rpcService, JobID jobId, SchedulingStrategy schedulingStrategy) { + protected SlotPool(JobID jobId, SchedulingStrategy schedulingStrategy) { Review comment: This class still contains a lot of code which actually moved to the new `Scheduler`. Can we remove this code? Having this code still in this class makes it extremely hard to review. Of course, this would mean that we also need to adapt the `SlotPoolTests` which makes sense anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662541#comment-16662541 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872341 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); Review comment: Let's make the return type a `Collection`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662545#comment-16662545 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227872749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java ## @@ -145,17 +149,61 @@ * If the returned future must not be completed right away (a.k.a. the slot request * can be queued), allowQueuedScheduling must be set to true. * +* @deprecated this method will be removed once the handling of slot sharing is completely extracted from the slot +* pool into a dedicated {@link Scheduler} component. The call is then replaced by calls to +* {@link #getAvailableSlotsInformation()}, {@link #allocateAvailableSlot(SlotRequestId, AllocationID)}, and +* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, Time)}. +* * @param slotRequestId identifying the requested slot * @param scheduledUnit for which to allocate slot * @param slotProfile profile that specifies the requirements for the requested slot * @param allowQueuedScheduling true if the slot request can be queued (e.g. the returned future must not be completed) * @param timeout for the operation * @return Future which is completed with the allocated {@link LogicalSlot} */ + @Deprecated CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, @RpcTimeout Time timeout); + + /** +* Returns a list of {@link SlotInfo} objects about all slots that are currently available in the slot +* pool. +* +* @return a list of {@link SlotInfo} objects about all slots that are currently available in the slot pool. +*/ + @Nonnull + List getAvailableSlotsInformation(); + + /** +* Allocates the available slot with the given allocation id under the given request id. This method returns +* {@code null} if no slot with the given allocation id is available. +* +* @param slotRequestId identifying the requested slot +* @param allocationID the allocation id of the requested available slot +* @return the previously available slot with the given allocation id or {@code null} if no such slot existed. +*/ + @Nullable + AllocatedSlot allocateAvailableSlot( Review comment: It's not so nice that we leak the `AllocatedSlot` which is supposed to be an internal class of the `SlotPool`. I think it would be better to have an interface which allows to do what's needed but, for example, does not expose the `releasePayload` call. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660910#comment-16660910 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227466414 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); Review comment: In theory: it is a collection and contains on a collection can be expensive, depending on the implementation. A hash set offers ideal performance for contains. In practise, this collections is currently always of size 1, so either way, it should not matter much. But as the code could evolve, I prefer not to create a potential complexity bomb :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660888#comment-16660888 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450833 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSchedulerFactory.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; + +import javax.annotation.Nonnull; + +import java.util.HashMap; + +/** + * Default implementation of a {@link SchedulerFactory}. + */ +public class DefaultSchedulerFactory implements SchedulerFactory { + + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + public DefaultSchedulerFactory(@Nonnull SlotSelectionStrategy slotSelectionStrategy) { + this.slotSelectionStrategy = slotSelectionStrategy; + } + + @Nonnull + @Override + public Scheduler createScheduler(@Nonnull SlotPoolGateway slotPoolGateway) { + return new Scheduler(new HashMap<>(), slotSelectionStrategy, slotPoolGateway); Review comment: What could be a reasonable initial size of the `HashMap`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660895#comment-16660895 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227456207 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + }
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660896#comment-16660896 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227460500 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + }
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660893#comment-16660893 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227453190 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { + + public static final LocationPreferenceSlotSelection INSTANCE = new LocationPreferenceSlotSelection(); + + private LocationPreferenceSlotSelection() { + } + + /** +* Calculates the candidate's locality score. +*/ + private static final BiFunction LOCALITY_EVALUATION_FUNCTION = + (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh; + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection locationPreferences = slotProfile.getPreferredLocations(); + + if (availableSlots.isEmpty()) { + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + final ResourceProfile resourceProfile = slotProfile.getResourceProfile(); + + // if we have no location preferences, we can only filter by the additional requirements. + if (locationPreferences.isEmpty()) { + for (SlotInfo candidate : availableSlots) { + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + return SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED); + } + } + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + // we build up two indexes, one for resource id and one for host names of the preferred locations. + final Map preferredResourceIDs = new HashMap<>(locationPreferences.size()); + final Map preferredFQHostNames = new HashMap<>(locationPreferences.size()); + + for (TaskManagerLocation locationPreference : locationPreferences) { + preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum); + preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, Integer::sum); + } + + SlotInfo bestCandidate = null; + Locality bestCandidateLocality = Locality.UNKNOWN; + int bestCandidateScore = Integer.MIN_VALUE; + + for (SlotInfo candidate : availableSlots) { + + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + + // this gets candidate is local-weigh + Integer localWeigh = preferredResourceIDs.getOrDefault(candidate.getTaskManagerLocation().getResourceID(), 0); + +
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660890#comment-16660890 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227449157 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -285,7 +290,9 @@ public JobMaster( this.slotPool = checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID()); - this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + this.slotPoolGateway = slotPool; Review comment: Can we remove this field? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660889#comment-16660889 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450350 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java ## @@ -39,10 +38,23 @@ * @param slotRequestId identifying the slot to release * @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none * @param cause of the slot release, null if none -* @return Acknowledge (future) after the slot has been released +* @return Acknowledge after the slot has been released */ - CompletableFuture releaseSlot( + @Deprecated + Acknowledge releaseSlot( Review comment: Could be `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660892#comment-16660892 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227450458 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java ## @@ -39,10 +38,23 @@ * @param slotRequestId identifying the slot to release * @param slotSharingGroupId identifying the slot sharing group to which the slot belongs, null if none * @param cause of the slot release, null if none -* @return Acknowledge (future) after the slot has been released +* @return Acknowledge after the slot has been released */ - CompletableFuture releaseSlot( + @Deprecated + Acknowledge releaseSlot( SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable Throwable cause); + + /** +* Releases the slot with the given {@link SlotRequestId}. Additionally, one can provide a cause for the slot release. +* +* @param slotRequestId identifying the slot to release +* @param cause of the slot release, null if none +* @return Acknowledge after the slot has been released +*/ + @Nonnull + Acknowledge releaseSlot( Review comment: Same here with `void`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660894#comment-16660894 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227452749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { + + public static final LocationPreferenceSlotSelection INSTANCE = new LocationPreferenceSlotSelection(); + + private LocationPreferenceSlotSelection() { + } + + /** +* Calculates the candidate's locality score. +*/ + private static final BiFunction LOCALITY_EVALUATION_FUNCTION = + (localWeigh, hostLocalWeigh) -> localWeigh * 10 + hostLocalWeigh; + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection locationPreferences = slotProfile.getPreferredLocations(); + + if (availableSlots.isEmpty()) { + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); + } + + final ResourceProfile resourceProfile = slotProfile.getResourceProfile(); + + // if we have no location preferences, we can only filter by the additional requirements. + if (locationPreferences.isEmpty()) { + for (SlotInfo candidate : availableSlots) { + if (candidate.getResourceProfile().isMatching(resourceProfile)) { + return SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED); + } + } + return SlotInfoAndLocality.of(null, Locality.UNKNOWN); Review comment: Factor out in separate method, e.g. `selectBestSlotByResourceProfile` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660887#comment-16660887 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227448691 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ## @@ -172,8 +172,8 @@ else if (ret instanceof CompletableFuture) { } @Override - public CompletableFuture cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { - return CompletableFuture.completedFuture(Acknowledge.get()); + public Acknowledge cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { Review comment: I guess it could have a `void` return type. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660897#comment-16660897 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227459571 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + }
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660900#comment-16660900 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227454366 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); Review comment: Why are we creating a new `HashSet` here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660901#comment-16660901 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227456417 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java ## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.BooleanSupplier; + +/** + * Scheduler that assigns tasks to slots. This class is currently work in progress, comments will be updated as we + * move forward. + */ +public class Scheduler implements SlotProvider, SlotOwner { + + /** Logger */ + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Strategy that selects the best slot for a given slot allocation request. */ + @Nonnull + private final SlotSelectionStrategy slotSelectionStrategy; + + /** Managers for the different slot sharing groups. */ + @Nonnull + private final Map slotSharingManagersMap; + + /** The slot pool from which slots are allocated. */ + @Nonnull + private final SlotPoolGateway slotPoolGateway; + + /** Executor for running tasks in the job master's main thread. */ + @Nonnull + private Executor componentMainThreadExecutor; + + /** Predicate to check if the current thread is the job master's main thread. */ + @Nonnull + private BooleanSupplier componentMainThreadCheck; + + + public Scheduler( + @Nonnull Map slotSharingManagersMap, + @Nonnull SlotSelectionStrategy slotSelectionStrategy, + @Nonnull SlotPoolGateway slotPoolGateway) { + + this.slotSelectionStrategy = slotSelectionStrategy; + this.slotSharingManagersMap = slotSharingManagersMap; + this.slotPoolGateway = slotPoolGateway; + this.componentMainThreadExecutor = (runnable) -> { + throw new IllegalStateException("Main thread executor not initialized."); + }; + this.componentMainThreadCheck = () -> { + throw new IllegalStateException("Main thread checker not initialized"); + }; + } + + public void start(@Nonnull Executor mainThreadExecutor, @Nonnull BooleanSupplier mainThreadCheck) { + this.componentMainThreadExecutor = mainThreadExecutor; + this.componentMainThreadCheck = mainThreadCheck; + }
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660891#comment-16660891 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227451887 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on location preference hints. + */ +public class LocationPreferenceSlotSelection implements SlotSelectionStrategy { Review comment: Singleton's are best being implemented by using an ``` enum LocationPreferenceSlotSelection implements SlotSelectionStrategy { INSTANCE; @Override public SlotInfoAndLocality selectBestSlotForProfile( } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660898#comment-16660898 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227453757 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { Review comment: Same comment concerning singleton's and using an `enum` for that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660899#comment-16660899 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227455130 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.SlotInfo; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This class implements a {@link SlotSelectionStrategy} that is based on previous allocations and + * falls back to using location preference hints if there is no previous allocation. + */ +public class PreviousAllocationSlotSelectionStrategy implements SlotSelectionStrategy { + + public static final PreviousAllocationSlotSelectionStrategy INSTANCE = + new PreviousAllocationSlotSelectionStrategy(); + + private final LocationPreferenceSlotSelection locationPreferenceSlotSelection; + + private PreviousAllocationSlotSelectionStrategy() { + this.locationPreferenceSlotSelection = LocationPreferenceSlotSelection.INSTANCE; + } + + @Nonnull + @Override + public SlotInfoAndLocality selectBestSlotForProfile( + @Nonnull List availableSlots, + @Nonnull SlotProfile slotProfile) { + + Collection priorAllocations = slotProfile.getPreferredAllocations(); + + // First, if there was a prior allocation try to schedule to the same/old slot + if (!priorAllocations.isEmpty()) { + + HashSet priorAllocationsSet = new HashSet<>(priorAllocations); + for (SlotInfo availableSlot : availableSlots) { + if (priorAllocationsSet.contains(availableSlot.getAllocationId())) { + return SlotInfoAndLocality.of(availableSlot, Locality.LOCAL); + } + } + } + + // Second, select based on location preference, excluding blacklisted allocations + Set blackListedAllocations = slotProfile.getPreviousExecutionGraphAllocations(); + if (blackListedAllocations.isEmpty()) { + return locationPreferenceSlotSelection.selectBestSlotForProfile(availableSlots, slotProfile); + } else { + ArrayList availableAndAllowedSlots = new ArrayList<>(availableSlots.size()); + for (SlotInfo availableSlot : availableSlots) { + if (!blackListedAllocations.contains(availableSlot.getAllocationId())) { + availableAndAllowedSlots.add(availableSlot); + } + } + return locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile); + } Review comment: I would move `locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots, slotProfile)` out of the if-else blocks and only do the filtering of `availableSlots` in the branches. This reduces a bit code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660839#comment-16660839 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227447242 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { } } + /** +* Computes and returns a set with the prior allocation ids from all execution vertices in the graph. +*/ + private Set computeAllPriorAllocationIds() { + HashSet allPreviousAllocationIds = new HashSet<>(); + Iterable ejvIterable = getVerticesTopologically(); + for (ExecutionJobVertex executionJobVertex : ejvIterable) { + for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) { Review comment: `getAllExecutionVertices` should be easier to use here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660838#comment-16660838 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446797 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { } } + /** +* Computes and returns a set with the prior allocation ids from all execution vertices in the graph. +*/ + private Set computeAllPriorAllocationIds() { + HashSet allPreviousAllocationIds = new HashSet<>(); Review comment: Initialize `HashSet` with `getNumberOfExecutionJobVertices()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660835#comment-16660835 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446372 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Will do. Overall this construct that uses `null` will go away once we introduce group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660836#comment-16660836 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227446397 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -939,13 +942,18 @@ public void scheduleForExecution() throws JobException { // collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // this is a (temporary) to avoid collecting the previous allocations for all executions again and again Review comment: noun missing in the comment This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660833#comment-16660833 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445757 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Please update the JavaDocs stating what `null` for this parameter means. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660827#comment-16660827 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444801 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Same here with `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660829#comment-16660829 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Point is that`null` and `emptySet` have different meanings here: `null` means it was not previously computed/provided and has tp be determined inside the execution. `emptySet` means it has been provided and the result was just empty. Overall, this whole construct should go away very soon when we move ro group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660828#comment-16660828 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444632 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -402,13 +404,15 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { public CompletableFuture scheduleForExecution( SlotProvider slotProvider, boolean queued, - LocationPreferenceConstraint locationPreferenceConstraint) { + LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds) { Review comment: Let's try not to pass in null arguments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660831#comment-16660831 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227445579 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Point is that`null` and `emptySet` have different meanings here: `null` means it was not previously computed/provided and has tp be determined inside the execution. `emptySet` means it has been provided and the result was just empty. Overall, this whole construct should go away very soon when we move ro group scheduling. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660830#comment-16660830 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r22705 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Let's not use `null` but instead `Collections.emptySet()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660824#comment-16660824 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444801 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -450,6 +454,7 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { SlotProvider slotProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds, Review comment: Same here with `@Nullable` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660823#comment-16660823 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r227444632 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -402,13 +404,15 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { public CompletableFuture scheduleForExecution( SlotProvider slotProvider, boolean queued, - LocationPreferenceConstraint locationPreferenceConstraint) { + LocationPreferenceConstraint locationPreferenceConstraint, + @Nullable Set allPreviousExecutionGraphAllocationIds) { Review comment: Let's try not to pass in null arguments This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660822#comment-16660822 ] ASF GitHub Bot commented on FLINK-10431: tillrohrmann commented on a change in pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898#discussion_r22705 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -385,7 +386,8 @@ public void setInitialState(@Nullable JobManagerTaskRestore taskRestore) { return scheduleForExecution( resourceProvider, allowQueued, - LocationPreferenceConstraint.ANY); + LocationPreferenceConstraint.ANY, + null); Review comment: Let's not use `null` but instead `Collections.emptySet()` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659365#comment-16659365 ] ASF GitHub Bot commented on FLINK-10431: StefanRRichter opened a new pull request #6898: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler URL: https://github.com/apache/flink/pull/6898 ## What is the purpose of the change This PR extracts the scheduling related code (e.g. slot sharing logic) from to slot pool into a preliminary version of a future scheduler component. Our primary goal is fixing the scheduling logic for local recovery. Changes in this PR open up potential for more code cleanups (e.g. removing all scheduling concerns from the slot pool, removing `ProviderAndOwner`, moving away from some `CompletableFuture` return types, etc). This cleanup and some test rewrites will happen in a followup PR. ## Brief change log - SlotPool is no longer a `RpcEndpoint`, we need to take care that all state modification happens in the component's main thread now. - Introduced `SlotInfo` and moving the slot sharing code into a scheduler component. Slot pool code can now deal with single slot requests. The pattern of interaction is more explicit, we have 3 main new methods: `getAvailableSlotsInformation` to list available slots, `allocateAvailableSlot` to allocated a listed / available slot, `requestNewAllocatedSlot` to request a new slot from the resoure manager. The old codepaths currently still co-exist in the slot pool and will be removed in followup work. - Introduce creating a collection of all previous allocations through `ExecutionGraph::computeAllPriorAllocationIds`. This serves as basis to compute a "blacklist" of allocation ids that we use to fix the scheduling of local recovery. - Provide an improved version of the scheduling for local recovery, that uses a blacklist. ## Verifying this change This change is already covered by existing tests, but we still need to rewrite tests for the slot pool and add more additional tests in followup work. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extract scheduling-related code from SlotPool > - > > Key: FLINK-10431 > URL: https://issues.apache.org/jira/browse/FLINK-10431 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > The other half of the current scheduling logic is the management of slot > sharing and is located in the SlotPool. We need to extract this logic into > our new Scheduler component from the previous step. This leaves us with a > simpler SlotPool that mainly cares about obtaining, holding, and releasing > slots in interaction with a ResourceManager. The new Scheduler can now > identify slot sharing groups and interacts with the SlotPool. -- This message was sent by Atlassian JIRA (v7.6.3#76005)