[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/2447 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77318844 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig --- @@ -1,185 +0,0 @@ -/* --- End diff -- Good catch :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77317670 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.runtime.instance.Slot; + +/** + * Interface for components that hold slots and to which slots get released / recycled. + */ +public interface SlotOwner { + + boolean returnAllocatedSlot(Slot slot); --- End diff -- JavaDocs are missing (even though the method is quite self-explanatory...) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77317356 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java --- @@ -20,73 +20,125 @@ import org.apache.flink.runtime.instance.SimpleSlot; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * + */ public class SlotAllocationFuture { - + private final Object monitor = new Object(); - + private volatile SimpleSlot slot; - + private volatile SlotAllocationFutureAction action; - + // + /** +* Creates a future that is uncompleted. +*/ public SlotAllocationFuture() {} - + + /** +* Creates a future that is immediately completed. +* +* @param slot The task slot that completes the future. +*/ public SlotAllocationFuture(SimpleSlot slot) { this.slot = slot; } - + // - - public SimpleSlot waitTillAllocated() throws InterruptedException { - return waitTillAllocated(0); - } - - public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException { + + public SimpleSlot waitTillCompleted() throws InterruptedException { synchronized (monitor) { while (slot == null) { - monitor.wait(timeout); + monitor.wait(); + } + return slot; + } + } + + public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { + checkArgument(timeout >= 0, "timeout may not be negative"); + checkNotNull(timeUnit, "timeUnit"); + + if (timeout == 0) { + return waitTillCompleted(); + } else { + final long deadline = System.nanoTime() + timeUnit.toNanos(timeout); + long millisToWait; + + synchronized (monitor) { + while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) { + monitor.wait(millisToWait); + } + + if (slot != null) { + return slot; + } else { + throw new TimeoutException(); + } } - + } + } + + /** +* Gets the slot from this future. This method throws an exception, if the future has not been completed. +* This method never blocks. +* +* @return The slot with which this future was completed. +* @throws IllegalStateException Thrown, if this method is called before the future is completed. +*/ + public SimpleSlot get() { + final SimpleSlot slot = this.slot; + if (slot != null) { return slot; + } else { + throw new IllegalStateException("The future is not complete - not slot available"); --- End diff -- +1 for Stephan's proposal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77314097 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java --- @@ -41,7 +41,7 @@ private final int connectionIndex; - public ConnectionID(InstanceConnectionInfo connectionInfo, int connectionIndex) { + public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) { --- End diff -- variable name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77313861 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java --- @@ -576,7 +560,7 @@ private void internalDisposeEmptySharedSlot(SharedSlot sharedSlot) { if (parent == null) { // root slot, return to the instance. - sharedSlot.getInstance().returnAllocatedSlot(sharedSlot); + sharedSlot.getOwner().returnAllocatedSlot(sharedSlot); --- End diff -- Can we use `sharedSlot.releaseSlot` instead? It seems not so right to get the owner which is then used to call `returnAllocatedSlot` with the same slot. I think this violates the law of Demeter. Alternatively, we could add a method `returnSlot` to `Slot` which does: `owner.returnAllocatedSlot(this)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77311597 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java --- @@ -147,7 +148,7 @@ public boolean reportHeartBeat(InstanceID instanceId, byte[] lastMetricsReport) public InstanceID registerTaskManager( ActorRef taskManager, ResourceID resourceID, --- End diff -- `ResourceID` should be contained in the `TaskManagerLocation`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r7737 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java --- @@ -84,22 +90,22 @@ * Constructs an instance reflecting a registered TaskManager. * * @param actorGateway The actor gateway to communicate with the remote instance -* @param connectionInfo The remote connection where the task manager receives requests. -* @param resourceId The resource id which denotes the resource the task manager uses. +* @param location The remote connection where the task manager receives requests. +* @param taskManagerId The resource id which denotes the resource the task manager uses. * @param id The id under which the taskManager is registered. * @param resources The resources available on the machine. * @param numberOfSlots The number of task slots offered by this taskManager. */ public Instance( ActorGateway actorGateway, - InstanceConnectionInfo connectionInfo, - ResourceID resourceId, + TaskManagerLocation location, + ResourceID taskManagerId, --- End diff -- Isn't the resource ID part of the `TaskManagerLocation`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77310730 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java --- @@ -104,16 +107,17 @@ public String toString() { (producerState == ExecutionState.RUNNING || producerState == ExecutionState.FINISHED)) { - final Instance partitionInstance = producerSlot.getInstance(); + final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation(); + final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID(); --- End diff -- But in the future the partition might reside on a machine which was not the producer. Thus, I guess that it's good then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77310357 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java --- @@ -44,15 +44,15 @@ private final ResultPartitionID partitionID; /** The partition connection info. */ - private final InstanceConnectionInfo partitionConnectionInfo; + private final TaskManagerLocation partitionConnectionInfo; /** The partition connection index. */ private final int partitionConnectionIndex; public PartialInputChannelDeploymentDescriptor( IntermediateDataSetID resultId, ResultPartitionID partitionID, - InstanceConnectionInfo partitionConnectionInfo, + TaskManagerLocation partitionConnectionInfo, --- End diff -- Shall we adapt the variable names as well? Otherwise it might be a little confusing that a `partitionConnectionInfo` is actually of type `TaskManagerLocation`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77310204 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java --- @@ -104,16 +107,17 @@ public String toString() { (producerState == ExecutionState.RUNNING || producerState == ExecutionState.FINISHED)) { - final Instance partitionInstance = producerSlot.getInstance(); + final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation(); + final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID(); --- End diff -- Does it make sense to rename the variable to `producerTaskManager`? Then it is symmetric to `consumerTaskManager`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77309411 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala --- @@ -19,20 +19,19 @@ package org.apache.flink.mesos.runtime.clusterframework import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.instance.InstanceConnectionInfo import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager -import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration} +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation} /** An extension of the TaskManager that listens for additional Mesos-related * messages. */ class MesosTaskManager( config: TaskManagerConfiguration, resourceID: ResourceID, -connectionInfo: InstanceConnectionInfo, +connectionInfo: TaskManagerLocation, --- End diff -- Shall we rename the variable accordingly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77161750 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java --- @@ -20,73 +20,125 @@ import org.apache.flink.runtime.instance.SimpleSlot; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * + */ public class SlotAllocationFuture { - + private final Object monitor = new Object(); - + private volatile SimpleSlot slot; - + private volatile SlotAllocationFutureAction action; - + // + /** +* Creates a future that is uncompleted. +*/ public SlotAllocationFuture() {} - + + /** +* Creates a future that is immediately completed. +* +* @param slot The task slot that completes the future. +*/ public SlotAllocationFuture(SimpleSlot slot) { this.slot = slot; } - + // - - public SimpleSlot waitTillAllocated() throws InterruptedException { - return waitTillAllocated(0); - } - - public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException { + + public SimpleSlot waitTillCompleted() throws InterruptedException { synchronized (monitor) { while (slot == null) { - monitor.wait(timeout); + monitor.wait(); + } + return slot; + } + } + + public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { + checkArgument(timeout >= 0, "timeout may not be negative"); + checkNotNull(timeUnit, "timeUnit"); + + if (timeout == 0) { + return waitTillCompleted(); + } else { + final long deadline = System.nanoTime() + timeUnit.toNanos(timeout); + long millisToWait; + + synchronized (monitor) { + while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) { + monitor.wait(millisToWait); + } + + if (slot != null) { + return slot; + } else { + throw new TimeoutException(); + } } - + } + } + + /** +* Gets the slot from this future. This method throws an exception, if the future has not been completed. +* This method never blocks. +* +* @return The slot with which this future was completed. +* @throws IllegalStateException Thrown, if this method is called before the future is completed. +*/ + public SimpleSlot get() { + final SimpleSlot slot = this.slot; + if (slot != null) { return slot; + } else { + throw new IllegalStateException("The future is not complete - not slot available"); --- End diff -- We can do this here. In the long run, we should probably replace this future by a generic future, through. In that case, it would be back to a generic exception. It may be better to have the code that calls the future to get the slot catch the exception and re-throw a `SlotNotReadyException`. That would also work when we use a generic future later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77101490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java --- @@ -20,73 +20,125 @@ import org.apache.flink.runtime.instance.SimpleSlot; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * + */ public class SlotAllocationFuture { - + private final Object monitor = new Object(); - + private volatile SimpleSlot slot; - + private volatile SlotAllocationFutureAction action; - + // + /** +* Creates a future that is uncompleted. +*/ public SlotAllocationFuture() {} - + + /** +* Creates a future that is immediately completed. +* +* @param slot The task slot that completes the future. +*/ public SlotAllocationFuture(SimpleSlot slot) { this.slot = slot; } - + // - - public SimpleSlot waitTillAllocated() throws InterruptedException { - return waitTillAllocated(0); - } - - public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException { + + public SimpleSlot waitTillCompleted() throws InterruptedException { synchronized (monitor) { while (slot == null) { - monitor.wait(timeout); + monitor.wait(); + } + return slot; + } + } + + public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { + checkArgument(timeout >= 0, "timeout may not be negative"); + checkNotNull(timeUnit, "timeUnit"); + + if (timeout == 0) { + return waitTillCompleted(); + } else { + final long deadline = System.nanoTime() + timeUnit.toNanos(timeout); + long millisToWait; + + synchronized (monitor) { + while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) { + monitor.wait(millisToWait); + } + + if (slot != null) { + return slot; + } else { + throw new TimeoutException(); + } } - + } + } + + /** +* Gets the slot from this future. This method throws an exception, if the future has not been completed. +* This method never blocks. +* +* @return The slot with which this future was completed. +* @throws IllegalStateException Thrown, if this method is called before the future is completed. +*/ + public SimpleSlot get() { + final SimpleSlot slot = this.slot; + if (slot != null) { return slot; + } else { + throw new IllegalStateException("The future is not complete - not slot available"); --- End diff -- Can we throw a explicitly exception like SlotNotReadyException instead of RuntimeException? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/2447 [FLINK-4490] [distributed coordination] Decouple the JobManage's slots from the 'Instance' To allow for a future dynamic slot allocation and release model, the slots should not depend on 'Instance'. In this change, the Slots hold most of the necessary information directly (location, gateway) and the interact with the Instance only via a 'SlotOwner' interface. The `ResourceID`, which has been introduces a while back, is a unique identifier for a TaskManager that is also constant for the lifetime of that particular TaskManager process. Where previously, the JobManager comparted `Instance` objects for referential equality to determine if two slots refer to the same TaskManager, the code now always refers to the `ResourceID` instread. *Side note: I suggest to promote the `ResourceID` to `TaskManagerID` at some point. This also reworks the `InstanceConnectionInfo`to the `TaskManagerLocation`, which contains in addition to IP/hostname/port also the `ResourceID`. All logic of the Scheduler and ExecutionGraph remains unchanged. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink slots_works Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2447.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2447 commit 198028f4ff1cc01e0e39bd389937a23b9ea45c7f Author: Stephan Ewen Date: 2016-08-29T14:58:31Z [FLINK-4490] [distributed coordination] (part 1) Change InstanceConnectionInfo to TaskManagerLocation This adds the ResourceId to the TaskManagerLocation commit 02ad27e065e3f32b4503fac6ac5a3138a3b607e1 Author: Stephan Ewen Date: 2016-08-30T18:34:20Z [FLINK-4490] [distributed coordination] (part 2) Make slots independent of 'Instance'. To allow for a future dynamic slot allocation and release model, the slots should not depend on 'Instance'. In this change, the Slots hold most of the necessary information directly (location, gateway) and the interact with the Instance only via a 'SlotOwner' interface. commit e7efae594015e2788f840b0fc3c685880fd35e85 Author: Stephan Ewen Date: 2016-08-31T11:52:45Z [FLINK-4525] [core] (followup) Remove remaining redundant code for pre-defined strictly local assignments. commit 7b30d8f6246af60b7fceebb0cdde761aee89a714 Author: Stephan Ewen Date: 2016-08-31T11:59:01Z [FLINK-4490] [distributed coordination] (part 3) Rename methods on 'Instance' to have more intuitive names getResourceID() --> getTaskManagerID() getInstanceConnectionInfo() --> getTaskManagerLocation() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---