[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-19 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/2447


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77318844
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
 ---
@@ -1,185 +0,0 @@
-/*
--- End diff --

Good catch :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77317670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+/**
+ * Interface for components that hold slots and to which slots get 
released / recycled.
+ */
+public interface SlotOwner {
+
+   boolean returnAllocatedSlot(Slot slot);
--- End diff --

JavaDocs are missing (even though the method is quite self-explanatory...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77317356
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
@@ -20,73 +20,125 @@
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-   
+
private final Object monitor = new Object();
-   
+
private volatile SimpleSlot slot;
-   
+
private volatile SlotAllocationFutureAction action;
-   
+
// 

 
+   /**
+* Creates a future that is uncompleted.
+*/
public SlotAllocationFuture() {}
-   
+
+   /**
+* Creates a future that is immediately completed.
+* 
+* @param slot The task slot that completes the future.
+*/
public SlotAllocationFuture(SimpleSlot slot) {
this.slot = slot;
}
-   
+
// 

-   
-   public SimpleSlot waitTillAllocated() throws InterruptedException {
-   return waitTillAllocated(0);
-   }
-   
-   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+   public SimpleSlot waitTillCompleted() throws InterruptedException {
synchronized (monitor) {
while (slot == null) {
-   monitor.wait(timeout);
+   monitor.wait();
+   }
+   return slot;
+   }
+   }
+
+   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+   checkArgument(timeout >= 0, "timeout may not be negative");
+   checkNotNull(timeUnit, "timeUnit");
+
+   if (timeout == 0) {
+   return waitTillCompleted();
+   } else {
+   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+   long millisToWait;
+
+   synchronized (monitor) {
+   while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+   monitor.wait(millisToWait);
+   }
+
+   if (slot != null) {
+   return slot;
+   } else {
+   throw new TimeoutException();
+   }
}
-   
+   }
+   }
+
+   /**
+* Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+* This method never blocks.
+* 
+* @return The slot with which this future was completed.
+* @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+*/
+   public SimpleSlot get() {
+   final SimpleSlot slot = this.slot;
+   if (slot != null) {
return slot;
+   } else {
+   throw new IllegalStateException("The future is not 
complete - not slot available");
--- End diff --

+1 for Stephan's proposal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77314097
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
 ---
@@ -41,7 +41,7 @@
 
private final int connectionIndex;
 
-   public ConnectionID(InstanceConnectionInfo connectionInfo, int 
connectionIndex) {
+   public ConnectionID(TaskManagerLocation connectionInfo, int 
connectionIndex) {
--- End diff --

variable name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77313861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 ---
@@ -576,7 +560,7 @@ private void internalDisposeEmptySharedSlot(SharedSlot 
sharedSlot) {

if (parent == null) {
// root slot, return to the instance.
-   
sharedSlot.getInstance().returnAllocatedSlot(sharedSlot);
+   sharedSlot.getOwner().returnAllocatedSlot(sharedSlot);
--- End diff --

Can we use `sharedSlot.releaseSlot` instead? It seems not so right to get 
the owner which is then used to call `returnAllocatedSlot` with the same slot. 
I think this violates the law of Demeter.

Alternatively, we could add a method `returnSlot` to `Slot` which does: 
`owner.returnAllocatedSlot(this)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77311597
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
 ---
@@ -147,7 +148,7 @@ public boolean reportHeartBeat(InstanceID instanceId, 
byte[] lastMetricsReport)
public InstanceID registerTaskManager(
ActorRef taskManager,
ResourceID resourceID,
--- End diff --

`ResourceID` should be contained in the `TaskManagerLocation`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r7737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---
@@ -84,22 +90,22 @@
 * Constructs an instance reflecting a registered TaskManager.
 *
 * @param actorGateway The actor gateway to communicate with the remote 
instance
-* @param connectionInfo The remote connection where the task manager 
receives requests.
-* @param resourceId The resource id which denotes the resource the 
task manager uses.
+* @param location The remote connection where the task manager 
receives requests.
+* @param taskManagerId The resource id which denotes the resource the 
task manager uses.
 * @param id The id under which the taskManager is registered.
 * @param resources The resources available on the machine.
 * @param numberOfSlots The number of task slots offered by this 
taskManager.
 */
public Instance(
ActorGateway actorGateway,
-   InstanceConnectionInfo connectionInfo,
-   ResourceID resourceId,
+   TaskManagerLocation location,
+   ResourceID taskManagerId,
--- End diff --

Isn't the resource ID part of the `TaskManagerLocation`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -104,16 +107,17 @@ public String toString() {
(producerState == ExecutionState.RUNNING
|| producerState == 
ExecutionState.FINISHED)) {
 
-   final Instance partitionInstance = 
producerSlot.getInstance();
+   final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
--- End diff --

But in the future the partition might reside on a machine which was not the 
producer. Thus, I guess that it's good then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310357
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
 ---
@@ -44,15 +44,15 @@
private final ResultPartitionID partitionID;
 
/** The partition connection info. */
-   private final InstanceConnectionInfo partitionConnectionInfo;
+   private final TaskManagerLocation partitionConnectionInfo;
 
/** The partition connection index. */
private final int partitionConnectionIndex;
 
public PartialInputChannelDeploymentDescriptor(
IntermediateDataSetID resultId,
ResultPartitionID partitionID,
-   InstanceConnectionInfo partitionConnectionInfo,
+   TaskManagerLocation partitionConnectionInfo,
--- End diff --

Shall we adapt the variable names as well? Otherwise it might be a little 
confusing that a `partitionConnectionInfo` is actually of type 
`TaskManagerLocation`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -104,16 +107,17 @@ public String toString() {
(producerState == ExecutionState.RUNNING
|| producerState == 
ExecutionState.FINISHED)) {
 
-   final Instance partitionInstance = 
producerSlot.getInstance();
+   final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
--- End diff --

Does it make sense to rename the variable to `producerTaskManager`? Then it 
is symmetric to `consumerTaskManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77309411
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 ---
@@ -19,20 +19,19 @@
 package org.apache.flink.mesos.runtime.clusterframework
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional 
Mesos-related
   * messages.
   */
 class MesosTaskManager(
 config: TaskManagerConfiguration,
 resourceID: ResourceID,
-connectionInfo: InstanceConnectionInfo,
+connectionInfo: TaskManagerLocation,
--- End diff --

Shall we rename the variable accordingly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77161750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
@@ -20,73 +20,125 @@
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-   
+
private final Object monitor = new Object();
-   
+
private volatile SimpleSlot slot;
-   
+
private volatile SlotAllocationFutureAction action;
-   
+
// 

 
+   /**
+* Creates a future that is uncompleted.
+*/
public SlotAllocationFuture() {}
-   
+
+   /**
+* Creates a future that is immediately completed.
+* 
+* @param slot The task slot that completes the future.
+*/
public SlotAllocationFuture(SimpleSlot slot) {
this.slot = slot;
}
-   
+
// 

-   
-   public SimpleSlot waitTillAllocated() throws InterruptedException {
-   return waitTillAllocated(0);
-   }
-   
-   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+   public SimpleSlot waitTillCompleted() throws InterruptedException {
synchronized (monitor) {
while (slot == null) {
-   monitor.wait(timeout);
+   monitor.wait();
+   }
+   return slot;
+   }
+   }
+
+   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+   checkArgument(timeout >= 0, "timeout may not be negative");
+   checkNotNull(timeUnit, "timeUnit");
+
+   if (timeout == 0) {
+   return waitTillCompleted();
+   } else {
+   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+   long millisToWait;
+
+   synchronized (monitor) {
+   while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+   monitor.wait(millisToWait);
+   }
+
+   if (slot != null) {
+   return slot;
+   } else {
+   throw new TimeoutException();
+   }
}
-   
+   }
+   }
+
+   /**
+* Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+* This method never blocks.
+* 
+* @return The slot with which this future was completed.
+* @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+*/
+   public SimpleSlot get() {
+   final SimpleSlot slot = this.slot;
+   if (slot != null) {
return slot;
+   } else {
+   throw new IllegalStateException("The future is not 
complete - not slot available");
--- End diff --

We can do this here. In the long run, we should probably replace this 
future by a generic future, through. In that case, it would be back to a 
generic exception.

It may be better to have the code that calls the future to get the slot 
catch the exception and re-throw a `SlotNotReadyException`. That would also 
work when we use a generic future later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-08-31 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77101490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
@@ -20,73 +20,125 @@
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-   
+
private final Object monitor = new Object();
-   
+
private volatile SimpleSlot slot;
-   
+
private volatile SlotAllocationFutureAction action;
-   
+
// 

 
+   /**
+* Creates a future that is uncompleted.
+*/
public SlotAllocationFuture() {}
-   
+
+   /**
+* Creates a future that is immediately completed.
+* 
+* @param slot The task slot that completes the future.
+*/
public SlotAllocationFuture(SimpleSlot slot) {
this.slot = slot;
}
-   
+
// 

-   
-   public SimpleSlot waitTillAllocated() throws InterruptedException {
-   return waitTillAllocated(0);
-   }
-   
-   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+   public SimpleSlot waitTillCompleted() throws InterruptedException {
synchronized (monitor) {
while (slot == null) {
-   monitor.wait(timeout);
+   monitor.wait();
+   }
+   return slot;
+   }
+   }
+
+   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+   checkArgument(timeout >= 0, "timeout may not be negative");
+   checkNotNull(timeUnit, "timeUnit");
+
+   if (timeout == 0) {
+   return waitTillCompleted();
+   } else {
+   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+   long millisToWait;
+
+   synchronized (monitor) {
+   while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+   monitor.wait(millisToWait);
+   }
+
+   if (slot != null) {
+   return slot;
+   } else {
+   throw new TimeoutException();
+   }
}
-   
+   }
+   }
+
+   /**
+* Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+* This method never blocks.
+* 
+* @return The slot with which this future was completed.
+* @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+*/
+   public SimpleSlot get() {
+   final SimpleSlot slot = this.slot;
+   if (slot != null) {
return slot;
+   } else {
+   throw new IllegalStateException("The future is not 
complete - not slot available");
--- End diff --

Can we throw a explicitly exception like SlotNotReadyException instead of 
RuntimeException? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

2016-08-31 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2447

[FLINK-4490] [distributed coordination] Decouple the JobManage's slots from 
the 'Instance'

To allow for a future dynamic slot allocation and release model, the slots 
should not depend on 'Instance'.

In this change, the Slots hold most of the necessary information directly 
(location, gateway) and
the interact with the Instance only via a 'SlotOwner' interface.

The `ResourceID`, which has been introduces a while back, is a unique 
identifier for a TaskManager that is  also constant for the lifetime of that 
particular TaskManager process. Where previously, the JobManager comparted 
`Instance` objects for referential equality to determine if two slots refer to 
the same TaskManager, the code now always refers to the `ResourceID` instread.

*Side note: I suggest to promote the `ResourceID` to `TaskManagerID` at 
some point.

This also reworks the `InstanceConnectionInfo`to the `TaskManagerLocation`, 
which contains in addition to IP/hostname/port also the `ResourceID`.

All logic of the Scheduler and ExecutionGraph remains unchanged.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink slots_works

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2447.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2447


commit 198028f4ff1cc01e0e39bd389937a23b9ea45c7f
Author: Stephan Ewen 
Date:   2016-08-29T14:58:31Z

[FLINK-4490] [distributed coordination] (part 1) Change 
InstanceConnectionInfo to TaskManagerLocation

This adds the ResourceId to the TaskManagerLocation

commit 02ad27e065e3f32b4503fac6ac5a3138a3b607e1
Author: Stephan Ewen 
Date:   2016-08-30T18:34:20Z

[FLINK-4490] [distributed coordination] (part 2) Make slots independent of 
'Instance'.

To allow for a future dynamic slot allocation and release model, the slots 
should not depend on 'Instance'.
In this change, the Slots hold most of the necessary information directly 
(location, gateway) and
the interact with the Instance only via a 'SlotOwner' interface.

commit e7efae594015e2788f840b0fc3c685880fd35e85
Author: Stephan Ewen 
Date:   2016-08-31T11:52:45Z

[FLINK-4525] [core] (followup) Remove remaining redundant code for 
pre-defined strictly local assignments.

commit 7b30d8f6246af60b7fceebb0cdde761aee89a714
Author: Stephan Ewen 
Date:   2016-08-31T11:59:01Z

[FLINK-4490] [distributed coordination] (part 3) Rename methods on 
'Instance' to have more intuitive names

getResourceID() --> getTaskManagerID()
getInstanceConnectionInfo() --> getTaskManagerLocation()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---