[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-12-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712632#comment-16712632
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on issue #6898: [FLINK-10431] Extraction of 
scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#issuecomment-445192951
 
 
   @Clark background is that it will make things easier and otherwise you 
have concurrency between two components that want to interact in transactional 
ways: if the scheduler runs in a different thread than the slot pool there can 
be concurent modifications to the slot pool (e.g. slots added/removed) between 
the scheduler asking for the available slots and the scheduler requesting the 
available slot. All of this has to be resolved and it becomes harder to 
understand and reason about the code. This can be avoided if scheduler and slot 
pool run in the same thread, and we are also aiming at having all modifications 
to the execution graph in the same thread as well. The threading model would 
then be that blocking or expensive operations run in their own thread so that 
the main thread is never blocked, but the results are always synced back to a 
main thread to runs all the modifications in scheduler, slot pool, execution 
graph, etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-11-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704330#comment-16704330
 ] 

ASF GitHub Bot commented on FLINK-10431:


Clark commented on issue #6898: [FLINK-10431] Extraction of 
scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#issuecomment-443109202
 
 
   Hi @StefanRRichter , I am just wondering why make SlotPool no longer to be 
an RpcEndpoint?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663665#comment-16663665
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228148052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
 ##
 @@ -312,7 +313,7 @@ public void validateRunsInMainThread() {
/**
 * Executor which executes runnables in the main thread context.
 */
-   protected static class MainThreadExecutor implements Executor {
+   protected static class MainThreadExecutor implements 
MainThreadExecutable {
 
 Review comment:
   I was also thinking about that. Are you suggesting to replace the current 
methods by the ones from `ScheduledExecutor` or to add them on top of the 
existing methods?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663662#comment-16663662
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228147167
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   In general, I like the idea, but I wonder how this is easily possible? For 
this methods, I agree but the interface also has `requestNewAllocatedSlot` 
which also exposes `AllocatedSlot` and typically is followed by attempts to 
assign payload.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663661#comment-16663661
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228147167
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   In general, I like the idea, but I wonder how this is easily possible? For 
this methods, I agree but the interface also has `requestNewAllocatedSlot` 
which also exposes then interface and typically is followed by attempts to 
assign payload.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663506#comment-16663506
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228105187
 
 

 ##
 File path: flink-tests/src/test/resources/log4j-test.properties
 ##
 @@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=DEBUG, testlogger
 
 Review comment:
   Please revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663512#comment-16663512
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228103506
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
 ##
 @@ -315,6 +315,11 @@ public int getPhysicalSlotNumber() {
return 0;
}
 
+   @Override
+   public ResourceProfile getResourceProfile() {
+   throw new UnsupportedOperationException("Not 
implemented");
 
 Review comment:
   Could the return value be `ResourceProfile#UNKNOWN`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663507#comment-16663507
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228104451
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ##
 @@ -404,7 +392,7 @@ public void testReleaseResource() throws Exception {
 */
@Test
public void testSlotRequestCancellationUponFailingRequest() throws 
Exception {
-   final SlotPool slotPool = new SlotPool(rpcService, jobId, 
LocationPreferenceSchedulingStrategy.getInstance());
+   final SlotPool slotPool = new SlotPool( jobId, 
LocationPreferenceSchedulingStrategy.getInstance());
 
 Review comment:
   whitespace after `(`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663518#comment-16663518
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228104159
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
 ##
 @@ -1,428 +1,428 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.slotpool;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.jobmaster.SlotRequestId;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.clock.Clock;
-import org.apache.flink.runtime.util.clock.SystemClock;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
-
-import static 
org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the SlotPool using a proper RPC setup.
- */
-public class SlotPoolRpcTest extends TestLogger {
-
-   private static RpcService rpcService;
-
-   private static final Time timeout = Time.seconds(10L);
-
-   private static final Time fastTimeout = Time.milliseconds(1L);
-
-   // 

-   //  setup
-   // 

-
-   @BeforeClass
-   public static void setup() {
-   ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-   rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
-   }
-
-   @AfterClass
-   public static  void shutdown() throws InterruptedException, 
ExecutionException, TimeoutException {
-   if (rpcService != null) {
-   RpcUtils.terminateRpcService(rpcService, timeout);
-   rpcService = null;
-   }
-   

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663511#comment-16663511
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228101887
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ##
 @@ -107,6 +107,7 @@ public void testSlotReleaseOnFailedResourceAssignment() 
throws Exception {
slotProvider,
false,
LocationPreferenceConstraint.ALL,
+   null,
 
 Review comment:
   I think we need to update this test and pass in `Collections.emptySet()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663513#comment-16663513
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228102058
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 ##
 @@ -71,7 +71,7 @@ public void testSlotReleasedWhenScheduledImmediately() {
 
assertEquals(ExecutionState.CREATED, 
vertex.getExecutionState());
// try to deploy to the slot
-   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
+   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, null);
 
 Review comment:
   `Collections.emptySet()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663517#comment-16663517
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228102608
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 ##
 @@ -111,7 +112,8 @@ public void testScheduleImmediately() throws Exception {
 
assertEquals(5, 
testingSlotProvider.getNumberOfAvailableSlots());
}
-   
+
+   @Ignore
 
 Review comment:
   Why do we ignore this test?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663509#comment-16663509
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228102113
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 ##
 @@ -157,7 +157,7 @@ public void testMultiRegionsFailover() throws Exception {
 
assertEquals(JobStatus.RUNNING, 
strategy.getFailoverRegion(ev11).getState());
 
-   ev21.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL);
+   ev21.scheduleForExecution(slotProvider, true, 
LocationPreferenceConstraint.ALL, null);
 
 Review comment:
   `Collections.emptySet()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663508#comment-16663508
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228105242
 
 

 ##
 File path: flink-tests/src/test/resources/log4j-test.properties
 ##
 @@ -29,3 +29,4 @@ log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] 
%-5p %c %x - %m%n
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
 log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.apache.flink.runtime.jobmaster.slotpool=DEBUG
 
 Review comment:
   Please revert


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663514#comment-16663514
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228101983
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 ##
 @@ -454,7 +454,7 @@ public void testScheduleOrDeployAfterCancel() {
// it can occur as the result of races
{
Scheduler scheduler = mock(Scheduler.class);
-   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL);
+   vertex.scheduleForExecution(scheduler, false, 
LocationPreferenceConstraint.ALL, null);
 
 Review comment:
   Same here with the test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663515#comment-16663515
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228104545
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestMainThreadExecutor.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TestMainThreadExecutor implements MainThreadExecutable {
 
 Review comment:
   JavaDocs missing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663510#comment-16663510
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228102692
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
 ##
 @@ -817,7 +818,8 @@ public void testSequentialAllocateAndRelease() {
fail(e.getMessage());
}
}
-   
+
+   @Ignore
 
 Review comment:
   Why `@Ignore`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663516#comment-16663516
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228105123
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestMainThreadExecutor.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TestMainThreadExecutor implements MainThreadExecutable {
+
+   private final ScheduledExecutorService scheduledExecutor;
+
+   public TestMainThreadExecutor(ScheduledExecutorService 
scheduledExecutor) {
+   this.scheduledExecutor = scheduledExecutor;
+   }
+
+   public TestMainThreadExecutor() {
+   this(Executors.newSingleThreadScheduledExecutor());
 
 Review comment:
   Shouldn't this class be an `ExecutorService` to close the created `Executor` 
properly? Otherwise we might run into problems when running tests using this 
`TestMainThreadExecutor` in a loop, for example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663489#comment-16663489
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228100910
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   Stefan and me had an offline discussion and we concluded to remove now the 
porting scaffolds because they are no longer needed. The `Scheduler` should now 
be responsible for the `LogicalSlot` allocation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663487#comment-16663487
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228100255
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
 
 Review comment:
   Why not changing the return type of `SlotProfile#getPreferredAllocations` 
into `Set`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663400#comment-16663400
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r228066391
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   In the end yes, absolutely right. But this is currently on purpose together 
with some of your other comments that there is still slot pool code around that 
could be removed. The purpose is you can switch back and forth now between the 
new and the old code path by changing a single line in the job master and to 
make the change as incremental as possible, as we previously discussed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662539#comment-16662539
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227849935
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   Shouldn't this method be only called by the new `Scheduler` which already 
wraps all external calls into the main thread? If yes, then we don't need to do 
it here again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662544#comment-16662544
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872252
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
 
 Review comment:
   What's preventing us from removing this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662540#comment-16662540
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227848647
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -324,78 +297,98 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
-
-   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
-
-   if (slotSharingGroupId != null) {
-   // allocate slot with slot sharing
-   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
-   slotSharingGroupId,
-   id -> new SlotSharingManager(
-   id,
-   this,
-   providerAndOwner));
-
-   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotLocality;
-
-   try {
-   if (task.getCoLocationConstraint() != null) {
-   multiTaskSlotLocality = 
allocateCoLocatedMultiTaskSlot(
-   task.getCoLocationConstraint(),
-   multiTaskSlotManager,
-   slotProfile,
-   allowQueuedScheduling,
-   allocationTimeout);
+   return 
CompletableFuture.completedFuture(null).thenComposeAsync((i) -> {
 
 Review comment:
   `CompletableFuture.supplyAsync()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662546#comment-16662546
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227877459
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
 ##
 @@ -312,7 +313,7 @@ public void validateRunsInMainThread() {
/**
 * Executor which executes runnables in the main thread context.
 */
-   protected static class MainThreadExecutor implements Executor {
+   protected static class MainThreadExecutor implements 
MainThreadExecutable {
 
 Review comment:
   `MainThreadExecutable` was actually intended to be a marker interface for 
the `RpcServer`. I would recommend to let the `MainThreadExecutor` implement 
the `ScheduledExecutor`. That way, there is no need to change the 
`MainThreadExecutable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662547#comment-16662547
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227870899
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -822,6 +835,29 @@ private SlotAndLocality pollAndAllocateSlot(SlotRequestId 
slotRequestId, SlotPro
return slotFromPool;
}
 
+   @Nullable
+   private AllocatedSlot allocateSlotWithID(@Nonnull SlotRequestId 
slotRequestId, @Nonnull AllocationID allocationID) {
+   AllocatedSlot allocatedSlot = 
availableSlots.tryRemove(allocationID);
+   if (allocatedSlot != null) {
+   allocatedSlots.add(slotRequestId, allocatedSlot);
+   }
+   return allocatedSlot;
+   }
+
+   @Override
+   @Nullable
+   public AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   Could think of whether to make it an `Optional` to make it more explicit 
that this might not succeed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662542#comment-16662542
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872881
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
+   @Nonnull SlotRequestId slotRequestId,
+   @Nonnull AllocationID allocationID);
+
+   /**
+* Request the allocation of a new slot from the resource manager. This 
method will not return a slot from the
+* already available slots from the pool, but instead will add a new 
slot to that pool that is immediately allocated
+* and returned.
+*
+* @param slotRequestId identifying the requested slot
+* @param resourceProfile resource profile that specifies the resource 
requirements for the requested slot
+* @param timeout timeout for the allocation procedure
+* @return a newly allocated slot that was previously not available.
+*/
+   @Nonnull
+   CompletableFuture requestNewAllocatedSlot(
 
 Review comment:
   Same here with the `AllocatedSlot`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can 

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662548#comment-16662548
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872948
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
 ##
 @@ -90,7 +90,7 @@
 * @param cause of the cancellation
 * @return Future which is completed once the slot request has been 
cancelled
 */
-   CompletableFuture cancelSlotRequest(
+   Acknowledge cancelSlotRequest(
 
 Review comment:
   Can this become `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662543#comment-16662543
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227871624
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ##
 @@ -136,12 +143,13 @@
 
private String jobManagerAddress;
 
+   private MainThreadExecutable jmMainThreadScheduledExecutor;
+
// 

 
@VisibleForTesting
-   protected SlotPool(RpcService rpcService, JobID jobId, 
SchedulingStrategy schedulingStrategy) {
+   protected SlotPool(JobID jobId, SchedulingStrategy schedulingStrategy) {
 
 Review comment:
   This class still contains a lot of code which actually moved to the new 
`Scheduler`. Can we remove this code? Having this code still in this class 
makes it extremely hard to review. Of course, this would mean that we also need 
to adapt the `SlotPoolTests` which makes sense anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662541#comment-16662541
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872341
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
 
 Review comment:
   Let's make the return type a `Collection`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662545#comment-16662545
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227872749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ##
 @@ -145,17 +149,61 @@
 * If the returned future must not be completed right away (a.k.a. 
the slot request
 * can be queued), allowQueuedScheduling must be set to true.
 *
+* @deprecated this method will be removed once the handling of slot 
sharing is completely extracted from the slot
+* pool into a dedicated {@link Scheduler} component. The call is then 
replaced by calls to
+* {@link #getAvailableSlotsInformation()}, {@link 
#allocateAvailableSlot(SlotRequestId, AllocationID)}, and
+* {@link #requestNewAllocatedSlot(SlotRequestId, ResourceProfile, 
Time)}.
+*
 * @param slotRequestId identifying the requested slot
 * @param scheduledUnit for which to allocate slot
 * @param slotProfile profile that specifies the requirements for the 
requested slot
 * @param allowQueuedScheduling true if the slot request can be queued 
(e.g. the returned future must not be completed)
 * @param timeout for the operation
 * @return Future which is completed with the allocated {@link 
LogicalSlot}
 */
+   @Deprecated
CompletableFuture allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
boolean allowQueuedScheduling,
@RpcTimeout Time timeout);
+
+   /**
+* Returns a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot
+* pool.
+*
+* @return a list of {@link SlotInfo} objects about all slots that are 
currently available in the slot pool.
+*/
+   @Nonnull
+   List getAvailableSlotsInformation();
+
+   /**
+* Allocates the available slot with the given allocation id under the 
given request id. This method returns
+* {@code null} if no slot with the given allocation id is available.
+*
+* @param slotRequestId identifying the requested slot
+* @param allocationID the allocation id of the requested available slot
+* @return the previously available slot with the given allocation id 
or {@code null} if no such slot existed.
+*/
+   @Nullable
+   AllocatedSlot allocateAvailableSlot(
 
 Review comment:
   It's not so nice that we leak the `AllocatedSlot` which is supposed to be an 
internal class of the `SlotPool`. I think it would be better to have an 
interface which allows to do what's needed but, for example, does not expose 
the `releasePayload` call.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660910#comment-16660910
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227466414
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
 
 Review comment:
   In theory: it is a collection and contains on a collection can be expensive, 
depending on the implementation. A hash set offers ideal performance for 
contains.
   In practise, this collections is currently always of size 1, so either way, 
it should not matter much. But as the code could evolve, I prefer not to create 
a potential complexity bomb :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660888#comment-16660888
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450833
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultSchedulerFactory.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nonnull;
+
+import java.util.HashMap;
+
+/**
+ * Default implementation of a {@link SchedulerFactory}.
+ */
+public class DefaultSchedulerFactory implements SchedulerFactory {
+
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   public DefaultSchedulerFactory(@Nonnull SlotSelectionStrategy 
slotSelectionStrategy) {
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   }
+
+   @Nonnull
+   @Override
+   public Scheduler createScheduler(@Nonnull SlotPoolGateway 
slotPoolGateway) {
+   return new Scheduler(new HashMap<>(), slotSelectionStrategy, 
slotPoolGateway);
 
 Review comment:
   What could be a reasonable initial size of the `HashMap`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660895#comment-16660895
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227456207
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660896#comment-16660896
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227460500
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660893#comment-16660893
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227453190
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
+
+   public static final LocationPreferenceSlotSelection INSTANCE = new 
LocationPreferenceSlotSelection();
+
+   private LocationPreferenceSlotSelection() {
+   }
+
+   /**
+* Calculates the candidate's locality score.
+*/
+   private static final BiFunction 
LOCALITY_EVALUATION_FUNCTION =
+   (localWeigh, hostLocalWeigh) -> localWeigh * 10 + 
hostLocalWeigh;
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection locationPreferences = 
slotProfile.getPreferredLocations();
+
+   if (availableSlots.isEmpty()) {
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   final ResourceProfile resourceProfile = 
slotProfile.getResourceProfile();
+
+   // if we have no location preferences, we can only filter by 
the additional requirements.
+   if (locationPreferences.isEmpty()) {
+   for (SlotInfo candidate : availableSlots) {
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+   return 
SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED);
+   }
+   }
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   // we build up two indexes, one for resource id and one for 
host names of the preferred locations.
+   final Map preferredResourceIDs = new 
HashMap<>(locationPreferences.size());
+   final Map preferredFQHostNames = new 
HashMap<>(locationPreferences.size());
+
+   for (TaskManagerLocation locationPreference : 
locationPreferences) {
+   
preferredResourceIDs.merge(locationPreference.getResourceID(), 1, Integer::sum);
+   
preferredFQHostNames.merge(locationPreference.getFQDNHostname(), 1, 
Integer::sum);
+   }
+
+   SlotInfo bestCandidate = null;
+   Locality bestCandidateLocality = Locality.UNKNOWN;
+   int bestCandidateScore = Integer.MIN_VALUE;
+
+   for (SlotInfo candidate : availableSlots) {
+
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+
+   // this gets candidate is local-weigh
+   Integer localWeigh = 
preferredResourceIDs.getOrDefault(candidate.getTaskManagerLocation().getResourceID(),
 0);
+
+

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660890#comment-16660890
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227449157
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -285,7 +290,9 @@ public JobMaster(
 
this.slotPool = 
checkNotNull(slotPoolFactory).createSlotPool(jobGraph.getJobID());
 
-   this.slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+   this.slotPoolGateway = slotPool;
 
 Review comment:
   Can we remove this field?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660889#comment-16660889
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450350
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
 ##
 @@ -39,10 +38,23 @@
 * @param slotRequestId identifying the slot to release
 * @param slotSharingGroupId identifying the slot sharing group to 
which the slot belongs, null if none
 * @param cause of the slot release, null if none
-* @return Acknowledge (future) after the slot has been released
+* @return Acknowledge after the slot has been released
 */
-   CompletableFuture releaseSlot(
+   @Deprecated
+   Acknowledge releaseSlot(
 
 Review comment:
   Could be `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660892#comment-16660892
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227450458
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotActions.java
 ##
 @@ -39,10 +38,23 @@
 * @param slotRequestId identifying the slot to release
 * @param slotSharingGroupId identifying the slot sharing group to 
which the slot belongs, null if none
 * @param cause of the slot release, null if none
-* @return Acknowledge (future) after the slot has been released
+* @return Acknowledge after the slot has been released
 */
-   CompletableFuture releaseSlot(
+   @Deprecated
+   Acknowledge releaseSlot(
SlotRequestId slotRequestId,
@Nullable SlotSharingGroupId slotSharingGroupId,
@Nullable Throwable cause);
+
+   /**
+* Releases the slot with the given {@link SlotRequestId}. 
Additionally, one can provide a cause for the slot release.
+*
+* @param slotRequestId identifying the slot to release
+* @param cause of the slot release, null if none
+* @return Acknowledge after the slot has been released
+*/
+   @Nonnull
+   Acknowledge releaseSlot(
 
 Review comment:
   Same here with `void`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660894#comment-16660894
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227452749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
+
+   public static final LocationPreferenceSlotSelection INSTANCE = new 
LocationPreferenceSlotSelection();
+
+   private LocationPreferenceSlotSelection() {
+   }
+
+   /**
+* Calculates the candidate's locality score.
+*/
+   private static final BiFunction 
LOCALITY_EVALUATION_FUNCTION =
+   (localWeigh, hostLocalWeigh) -> localWeigh * 10 + 
hostLocalWeigh;
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection locationPreferences = 
slotProfile.getPreferredLocations();
+
+   if (availableSlots.isEmpty()) {
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
+   }
+
+   final ResourceProfile resourceProfile = 
slotProfile.getResourceProfile();
+
+   // if we have no location preferences, we can only filter by 
the additional requirements.
+   if (locationPreferences.isEmpty()) {
+   for (SlotInfo candidate : availableSlots) {
+   if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+   return 
SlotInfoAndLocality.of(candidate, Locality.UNCONSTRAINED);
+   }
+   }
+   return SlotInfoAndLocality.of(null, Locality.UNKNOWN);
 
 Review comment:
   Factor out in separate method, e.g. `selectBestSlotByResourceProfile`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and 

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660887#comment-16660887
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227448691
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 ##
 @@ -172,8 +172,8 @@ else if (ret instanceof CompletableFuture) {
}
 
@Override
-   public CompletableFuture cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
-   return CompletableFuture.completedFuture(Acknowledge.get());
+   public Acknowledge cancelSlotRequest(SlotRequestId slotRequestId, 
@Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
 
 Review comment:
   I guess it could have a `void` return type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660897#comment-16660897
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227459571
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660900#comment-16660900
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227454366
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
 
 Review comment:
   Why are we creating a new `HashSet` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660901#comment-16660901
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227456417
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/Scheduler.java
 ##
 @@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.function.BooleanSupplier;
+
+/**
+ * Scheduler that assigns tasks to slots. This class is currently work in 
progress, comments will be updated as we
+ * move forward.
+ */
+public class Scheduler implements SlotProvider, SlotOwner {
+
+   /** Logger */
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   /** Strategy that selects the best slot for a given slot allocation 
request. */
+   @Nonnull
+   private final SlotSelectionStrategy slotSelectionStrategy;
+
+   /** Managers for the different slot sharing groups. */
+   @Nonnull
+   private final Map 
slotSharingManagersMap;
+
+   /** The slot pool from which slots are allocated. */
+   @Nonnull
+   private final SlotPoolGateway slotPoolGateway;
+
+   /** Executor for running tasks in the job master's main thread. */
+   @Nonnull
+   private Executor componentMainThreadExecutor;
+
+   /** Predicate to check if the current thread is the job master's main 
thread. */
+   @Nonnull
+   private BooleanSupplier componentMainThreadCheck;
+
+
+   public Scheduler(
+   @Nonnull Map 
slotSharingManagersMap,
+   @Nonnull SlotSelectionStrategy slotSelectionStrategy,
+   @Nonnull SlotPoolGateway slotPoolGateway) {
+
+   this.slotSelectionStrategy = slotSelectionStrategy;
+   this.slotSharingManagersMap = slotSharingManagersMap;
+   this.slotPoolGateway = slotPoolGateway;
+   this.componentMainThreadExecutor = (runnable) -> {
+   throw new IllegalStateException("Main thread executor 
not initialized.");
+   };
+   this.componentMainThreadCheck = () -> {
+   throw new IllegalStateException("Main thread checker 
not initialized");
+   };
+   }
+
+   public void start(@Nonnull Executor mainThreadExecutor, @Nonnull 
BooleanSupplier mainThreadCheck) {
+   this.componentMainThreadExecutor = mainThreadExecutor;
+   this.componentMainThreadCheck = mainThreadCheck;
+   }

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660891#comment-16660891
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227451887
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelection.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
location preference hints.
+ */
+public class LocationPreferenceSlotSelection implements SlotSelectionStrategy {
 
 Review comment:
   Singleton's are best being implemented by using an 
   ```
   enum LocationPreferenceSlotSelection implements SlotSelectionStrategy { 
   INSTANCE; 
   @Override
   public SlotInfoAndLocality selectBestSlotForProfile(
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660898#comment-16660898
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227453757
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
 
 Review comment:
   Same comment concerning singleton's and using an `enum` for that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660899#comment-16660899
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227455130
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public class PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
+
+   public static final PreviousAllocationSlotSelectionStrategy INSTANCE =
+   new PreviousAllocationSlotSelectionStrategy();
+
+   private final LocationPreferenceSlotSelection 
locationPreferenceSlotSelection;
+
+   private PreviousAllocationSlotSelectionStrategy() {
+   this.locationPreferenceSlotSelection = 
LocationPreferenceSlotSelection.INSTANCE;
+   }
+
+   @Nonnull
+   @Override
+   public SlotInfoAndLocality selectBestSlotForProfile(
+   @Nonnull List availableSlots,
+   @Nonnull SlotProfile slotProfile) {
+
+   Collection priorAllocations = 
slotProfile.getPreferredAllocations();
+
+   // First, if there was a prior allocation try to schedule to 
the same/old slot
+   if (!priorAllocations.isEmpty()) {
+
+   HashSet priorAllocationsSet = new 
HashSet<>(priorAllocations);
+   for (SlotInfo availableSlot : availableSlots) {
+   if 
(priorAllocationsSet.contains(availableSlot.getAllocationId())) {
+   return 
SlotInfoAndLocality.of(availableSlot, Locality.LOCAL);
+   }
+   }
+   }
+
+   // Second, select based on location preference, excluding 
blacklisted allocations
+   Set blackListedAllocations = 
slotProfile.getPreviousExecutionGraphAllocations();
+   if (blackListedAllocations.isEmpty()) {
+   return 
locationPreferenceSlotSelection.selectBestSlotForProfile(availableSlots, 
slotProfile);
+   } else {
+   ArrayList availableAndAllowedSlots = new 
ArrayList<>(availableSlots.size());
+   for (SlotInfo availableSlot : availableSlots) {
+   if 
(!blackListedAllocations.contains(availableSlot.getAllocationId())) {
+   
availableAndAllowedSlots.add(availableSlot);
+   }
+   }
+   return 
locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots,
 slotProfile);
+   }
 
 Review comment:
   I would move 
`locationPreferenceSlotSelection.selectBestSlotForProfile(availableAndAllowedSlots,
 slotProfile)` out of the if-else blocks and only do the filtering of 
`availableSlots` in the branches. This reduces a bit code duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about 

[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660839#comment-16660839
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227447242
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot 
accumulatorSnapshot) {
}
}
 
+   /**
+* Computes and returns a set with the prior allocation ids from all 
execution vertices in the graph.
+*/
+   private Set computeAllPriorAllocationIds() {
+   HashSet allPreviousAllocationIds = new 
HashSet<>();
+   Iterable ejvIterable = 
getVerticesTopologically();
+   for (ExecutionJobVertex executionJobVertex : ejvIterable) {
+   for (ExecutionVertex executionVertex : 
executionJobVertex.getTaskVertices()) {
 
 Review comment:
   `getAllExecutionVertices` should be easier to use here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660838#comment-16660838
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446797
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -1676,6 +1684,35 @@ public void updateAccumulators(AccumulatorSnapshot 
accumulatorSnapshot) {
}
}
 
+   /**
+* Computes and returns a set with the prior allocation ids from all 
execution vertices in the graph.
+*/
+   private Set computeAllPriorAllocationIds() {
+   HashSet allPreviousAllocationIds = new 
HashSet<>();
 
 Review comment:
   Initialize `HashSet` with `getNumberOfExecutionJobVertices()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660835#comment-16660835
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446372
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Will do. Overall this construct that uses `null` will go away once we 
introduce group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660836#comment-16660836
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227446397
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -939,13 +942,18 @@ public void scheduleForExecution() throws JobException {
// collecting all the slots may resize and fail in that 
operation without slots getting lost
final ArrayList> 
allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
+   // this is a (temporary) to avoid collecting the previous 
allocations for all executions again and again
 
 Review comment:
   noun missing in the comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660833#comment-16660833
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445757
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Please update the JavaDocs stating what `null` for this parameter means.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660827#comment-16660827
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444801
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Same here with `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660829#comment-16660829
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Point is that`null` and `emptySet` have different meanings here: `null` 
means it was not previously computed/provided and has tp be determined inside 
the execution. `emptySet` means it has been provided and the result was just 
empty. Overall, this whole construct should go away very soon when we move ro 
group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660828#comment-16660828
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444632
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -402,13 +404,15 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
public CompletableFuture scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
-   LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds) {
 
 Review comment:
   Let's try not to pass in null arguments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660831#comment-16660831
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227445579
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Point is that`null` and `emptySet` have different meanings here: `null` 
means it was not previously computed/provided and has tp be determined inside 
the execution. `emptySet` means it has been provided and the result was just 
empty. Overall, this whole construct should go away very soon when we move ro 
group scheduling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660830#comment-16660830
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r22705
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Let's not use `null` but instead `Collections.emptySet()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660824#comment-16660824
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444801
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -450,6 +454,7 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds,
 
 Review comment:
   Same here with `@Nullable`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660823#comment-16660823
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r227444632
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -402,13 +404,15 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
public CompletableFuture scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
-   LocationPreferenceConstraint 
locationPreferenceConstraint) {
+   LocationPreferenceConstraint 
locationPreferenceConstraint,
+   @Nullable Set 
allPreviousExecutionGraphAllocationIds) {
 
 Review comment:
   Let's try not to pass in null arguments


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660822#comment-16660822
 ] 

ASF GitHub Bot commented on FLINK-10431:


tillrohrmann commented on a change in pull request #6898: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898#discussion_r22705
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -385,7 +386,8 @@ public void setInitialState(@Nullable 
JobManagerTaskRestore taskRestore) {
return scheduleForExecution(
resourceProvider,
allowQueued,
-   LocationPreferenceConstraint.ANY);
+   LocationPreferenceConstraint.ANY,
+   null);
 
 Review comment:
   Let's not use `null` but instead `Collections.emptySet()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10431) Extract scheduling-related code from SlotPool

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659365#comment-16659365
 ] 

ASF GitHub Bot commented on FLINK-10431:


StefanRRichter opened a new pull request #6898: [FLINK-10431] Extraction of 
scheduling-related code from SlotPool into preliminary Scheduler
URL: https://github.com/apache/flink/pull/6898
 
 
   ## What is the purpose of the change
   
   This PR extracts the scheduling related code (e.g. slot sharing logic) from 
to slot pool into a preliminary version of a future scheduler component. Our 
primary goal is fixing the scheduling logic for local recovery. Changes in this 
PR open up potential for more code cleanups (e.g. removing all scheduling 
concerns from the slot pool, removing `ProviderAndOwner`, moving away from some 
`CompletableFuture` return types, etc). This cleanup and some test rewrites 
will happen in a followup PR.
   
   ## Brief change log
   
   - SlotPool is no longer a `RpcEndpoint`, we need to take care that all state 
modification happens in the component's main thread now.
   - Introduced `SlotInfo` and moving the slot sharing code into a scheduler 
component. Slot pool code can now deal with single slot requests. The pattern 
of interaction is more explicit, we have 3 main new methods: 
`getAvailableSlotsInformation` to list available slots, `allocateAvailableSlot` 
to allocated a listed / available slot, `requestNewAllocatedSlot` to request a 
new slot from the resoure manager. The old codepaths currently still co-exist 
in the slot pool and will be removed in followup work.
   - Introduce creating a collection of all previous allocations through 
`ExecutionGraph::computeAllPriorAllocationIds`. This serves as basis to compute 
a "blacklist" of allocation ids that we use to fix the scheduling of local 
recovery.
   - Provide an improved version of the scheduling for local recovery, that 
uses a blacklist.
   
   
   ## Verifying this change
   
   
   This change is already covered by existing tests, but we still need to 
rewrite tests for the slot pool and add more additional tests in followup work.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract scheduling-related code from SlotPool
> -
>
> Key: FLINK-10431
> URL: https://issues.apache.org/jira/browse/FLINK-10431
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> The other half of the current scheduling logic is the management of slot 
> sharing and is located in the SlotPool. We need to extract this logic into 
> our new Scheduler component from the previous step. This leaves us with a 
> simpler SlotPool that mainly cares about obtaining, holding, and releasing 
> slots in interaction with a ResourceManager. The new Scheduler can now 
> identify slot sharing groups and interacts with the SlotPool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)