[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504516#comment-15504516
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2447
  
Was manually merges as of eac6088a75e813a015b778f4cfc4cce0cf2a53ce


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504517#comment-15504517
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/2447


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458119#comment-15458119
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2447
  
We can do either that follow-up, or we can add an RPC gateway in the 
`flip-6` branch and remove the ActorGateway after merging the feature and 
master branch.

Will address the comments and merge...


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458061#comment-15458061
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2447
  
Really good changes @StephanEwen. +1 for merging.

I had only some minor comments. 

I guess that replacing the `ActorGateways` in the `Slots` and preparing the 
`Scheduler` to work with futures, are the next follow-up tasks.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458051#comment-15458051
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77318844
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
 ---
@@ -1,185 +0,0 @@
-/*
--- End diff --

Good catch :-)


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458030#comment-15458030
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77317670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+/**
+ * Interface for components that hold slots and to which slots get 
released / recycled.
+ */
+public interface SlotOwner {
+
+   boolean returnAllocatedSlot(Slot slot);
--- End diff --

JavaDocs are missing (even though the method is quite self-explanatory...)


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458026#comment-15458026
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77317356
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
@@ -20,73 +20,125 @@
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-   
+
private final Object monitor = new Object();
-   
+
private volatile SimpleSlot slot;
-   
+
private volatile SlotAllocationFutureAction action;
-   
+
// 

 
+   /**
+* Creates a future that is uncompleted.
+*/
public SlotAllocationFuture() {}
-   
+
+   /**
+* Creates a future that is immediately completed.
+* 
+* @param slot The task slot that completes the future.
+*/
public SlotAllocationFuture(SimpleSlot slot) {
this.slot = slot;
}
-   
+
// 

-   
-   public SimpleSlot waitTillAllocated() throws InterruptedException {
-   return waitTillAllocated(0);
-   }
-   
-   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+   public SimpleSlot waitTillCompleted() throws InterruptedException {
synchronized (monitor) {
while (slot == null) {
-   monitor.wait(timeout);
+   monitor.wait();
+   }
+   return slot;
+   }
+   }
+
+   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+   checkArgument(timeout >= 0, "timeout may not be negative");
+   checkNotNull(timeUnit, "timeUnit");
+
+   if (timeout == 0) {
+   return waitTillCompleted();
+   } else {
+   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+   long millisToWait;
+
+   synchronized (monitor) {
+   while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+   monitor.wait(millisToWait);
+   }
+
+   if (slot != null) {
+   return slot;
+   } else {
+   throw new TimeoutException();
+   }
}
-   
+   }
+   }
+
+   /**
+* Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+* This method never blocks.
+* 
+* @return The slot with which this future was completed.
+* @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+*/
+   public SimpleSlot get() {
+   final SimpleSlot slot = this.slot;
+   if (slot != null) {
return slot;
+   } else {
+   throw new IllegalStateException("The future is not 
complete - not slot available");
--- End diff --

+1 for Stephan's proposal


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding 

[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457980#comment-15457980
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77314097
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
 ---
@@ -41,7 +41,7 @@
 
private final int connectionIndex;
 
-   public ConnectionID(InstanceConnectionInfo connectionInfo, int 
connectionIndex) {
+   public ConnectionID(TaskManagerLocation connectionInfo, int 
connectionIndex) {
--- End diff --

variable name


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457976#comment-15457976
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77313861
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 ---
@@ -576,7 +560,7 @@ private void internalDisposeEmptySharedSlot(SharedSlot 
sharedSlot) {

if (parent == null) {
// root slot, return to the instance.
-   
sharedSlot.getInstance().returnAllocatedSlot(sharedSlot);
+   sharedSlot.getOwner().returnAllocatedSlot(sharedSlot);
--- End diff --

Can we use `sharedSlot.releaseSlot` instead? It seems not so right to get 
the owner which is then used to call `returnAllocatedSlot` with the same slot. 
I think this violates the law of Demeter.

Alternatively, we could add a method `returnSlot` to `Slot` which does: 
`owner.returnAllocatedSlot(this)`.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457941#comment-15457941
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77311597
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
 ---
@@ -147,7 +148,7 @@ public boolean reportHeartBeat(InstanceID instanceId, 
byte[] lastMetricsReport)
public InstanceID registerTaskManager(
ActorRef taskManager,
ResourceID resourceID,
--- End diff --

`ResourceID` should be contained in the `TaskManagerLocation`.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457936#comment-15457936
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r7737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---
@@ -84,22 +90,22 @@
 * Constructs an instance reflecting a registered TaskManager.
 *
 * @param actorGateway The actor gateway to communicate with the remote 
instance
-* @param connectionInfo The remote connection where the task manager 
receives requests.
-* @param resourceId The resource id which denotes the resource the 
task manager uses.
+* @param location The remote connection where the task manager 
receives requests.
+* @param taskManagerId The resource id which denotes the resource the 
task manager uses.
 * @param id The id under which the taskManager is registered.
 * @param resources The resources available on the machine.
 * @param numberOfSlots The number of task slots offered by this 
taskManager.
 */
public Instance(
ActorGateway actorGateway,
-   InstanceConnectionInfo connectionInfo,
-   ResourceID resourceId,
+   TaskManagerLocation location,
+   ResourceID taskManagerId,
--- End diff --

Isn't the resource ID part of the `TaskManagerLocation`?


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457930#comment-15457930
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -104,16 +107,17 @@ public String toString() {
(producerState == ExecutionState.RUNNING
|| producerState == 
ExecutionState.FINISHED)) {
 
-   final Instance partitionInstance = 
producerSlot.getInstance();
+   final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
--- End diff --

But in the future the partition might reside on a machine which was not the 
producer. Thus, I guess that it's good then.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457918#comment-15457918
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77310204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ---
@@ -104,16 +107,17 @@ public String toString() {
(producerState == ExecutionState.RUNNING
|| producerState == 
ExecutionState.FINISHED)) {
 
-   final Instance partitionInstance = 
producerSlot.getInstance();
+   final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+   final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
--- End diff --

Does it make sense to rename the variable to `producerTaskManager`? Then it 
is symmetric to `consumerTaskManager`.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457902#comment-15457902
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77309411
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 ---
@@ -19,20 +19,19 @@
 package org.apache.flink.mesos.runtime.clusterframework
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration}
+import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional 
Mesos-related
   * messages.
   */
 class MesosTaskManager(
 config: TaskManagerConfiguration,
 resourceID: ResourceID,
-connectionInfo: InstanceConnectionInfo,
+connectionInfo: TaskManagerLocation,
--- End diff --

Shall we rename the variable accordingly?


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457779#comment-15457779
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2447
  
Looking into it now


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15455206#comment-15455206
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77161750
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
@@ -20,73 +20,125 @@
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-   
+
private final Object monitor = new Object();
-   
+
private volatile SimpleSlot slot;
-   
+
private volatile SlotAllocationFutureAction action;
-   
+
// 

 
+   /**
+* Creates a future that is uncompleted.
+*/
public SlotAllocationFuture() {}
-   
+
+   /**
+* Creates a future that is immediately completed.
+* 
+* @param slot The task slot that completes the future.
+*/
public SlotAllocationFuture(SimpleSlot slot) {
this.slot = slot;
}
-   
+
// 

-   
-   public SimpleSlot waitTillAllocated() throws InterruptedException {
-   return waitTillAllocated(0);
-   }
-   
-   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+   public SimpleSlot waitTillCompleted() throws InterruptedException {
synchronized (monitor) {
while (slot == null) {
-   monitor.wait(timeout);
+   monitor.wait();
+   }
+   return slot;
+   }
+   }
+
+   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+   checkArgument(timeout >= 0, "timeout may not be negative");
+   checkNotNull(timeUnit, "timeUnit");
+
+   if (timeout == 0) {
+   return waitTillCompleted();
+   } else {
+   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+   long millisToWait;
+
+   synchronized (monitor) {
+   while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+   monitor.wait(millisToWait);
+   }
+
+   if (slot != null) {
+   return slot;
+   } else {
+   throw new TimeoutException();
+   }
}
-   
+   }
+   }
+
+   /**
+* Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+* This method never blocks.
+* 
+* @return The slot with which this future was completed.
+* @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+*/
+   public SimpleSlot get() {
+   final SimpleSlot slot = this.slot;
+   if (slot != null) {
return slot;
+   } else {
+   throw new IllegalStateException("The future is not 
complete - not slot available");
--- End diff --

We can do this here. In the long run, we should probably replace this 
future by a generic future, through. In that case, it would be back to a 
generic exception.

It may be better to have the code that calls the future to get the slot 
catch the exception and re-throw a `SlotNotReadyException`. That would also 
work when we use a generic future later.


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For 

[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453958#comment-15453958
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/2447#discussion_r77101490
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 ---
@@ -20,73 +20,125 @@
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-   
+
private final Object monitor = new Object();
-   
+
private volatile SimpleSlot slot;
-   
+
private volatile SlotAllocationFutureAction action;
-   
+
// 

 
+   /**
+* Creates a future that is uncompleted.
+*/
public SlotAllocationFuture() {}
-   
+
+   /**
+* Creates a future that is immediately completed.
+* 
+* @param slot The task slot that completes the future.
+*/
public SlotAllocationFuture(SimpleSlot slot) {
this.slot = slot;
}
-   
+
// 

-   
-   public SimpleSlot waitTillAllocated() throws InterruptedException {
-   return waitTillAllocated(0);
-   }
-   
-   public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+   public SimpleSlot waitTillCompleted() throws InterruptedException {
synchronized (monitor) {
while (slot == null) {
-   monitor.wait(timeout);
+   monitor.wait();
+   }
+   return slot;
+   }
+   }
+
+   public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+   checkArgument(timeout >= 0, "timeout may not be negative");
+   checkNotNull(timeUnit, "timeUnit");
+
+   if (timeout == 0) {
+   return waitTillCompleted();
+   } else {
+   final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+   long millisToWait;
+
+   synchronized (monitor) {
+   while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+   monitor.wait(millisToWait);
+   }
+
+   if (slot != null) {
+   return slot;
+   } else {
+   throw new TimeoutException();
+   }
}
-   
+   }
+   }
+
+   /**
+* Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+* This method never blocks.
+* 
+* @return The slot with which this future was completed.
+* @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+*/
+   public SimpleSlot get() {
+   final SimpleSlot slot = this.slot;
+   if (slot != null) {
return slot;
+   } else {
+   throw new IllegalStateException("The future is not 
complete - not slot available");
--- End diff --

Can we throw a explicitly exception like SlotNotReadyException instead of 
RuntimeException? 


> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and 

[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-08-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15452798#comment-15452798
 ] 

ASF GitHub Bot commented on FLINK-4490:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/2447

[FLINK-4490] [distributed coordination] Decouple the JobManage's slots from 
the 'Instance'

To allow for a future dynamic slot allocation and release model, the slots 
should not depend on 'Instance'.

In this change, the Slots hold most of the necessary information directly 
(location, gateway) and
the interact with the Instance only via a 'SlotOwner' interface.

The `ResourceID`, which has been introduces a while back, is a unique 
identifier for a TaskManager that is  also constant for the lifetime of that 
particular TaskManager process. Where previously, the JobManager comparted 
`Instance` objects for referential equality to determine if two slots refer to 
the same TaskManager, the code now always refers to the `ResourceID` instread.

*Side note: I suggest to promote the `ResourceID` to `TaskManagerID` at 
some point.

This also reworks the `InstanceConnectionInfo`to the `TaskManagerLocation`, 
which contains in addition to IP/hostname/port also the `ResourceID`.

All logic of the Scheduler and ExecutionGraph remains unchanged.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink slots_works

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2447.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2447


commit 198028f4ff1cc01e0e39bd389937a23b9ea45c7f
Author: Stephan Ewen 
Date:   2016-08-29T14:58:31Z

[FLINK-4490] [distributed coordination] (part 1) Change 
InstanceConnectionInfo to TaskManagerLocation

This adds the ResourceId to the TaskManagerLocation

commit 02ad27e065e3f32b4503fac6ac5a3138a3b607e1
Author: Stephan Ewen 
Date:   2016-08-30T18:34:20Z

[FLINK-4490] [distributed coordination] (part 2) Make slots independent of 
'Instance'.

To allow for a future dynamic slot allocation and release model, the slots 
should not depend on 'Instance'.
In this change, the Slots hold most of the necessary information directly 
(location, gateway) and
the interact with the Instance only via a 'SlotOwner' interface.

commit e7efae594015e2788f840b0fc3c685880fd35e85
Author: Stephan Ewen 
Date:   2016-08-31T11:52:45Z

[FLINK-4525] [core] (followup) Remove remaining redundant code for 
pre-defined strictly local assignments.

commit 7b30d8f6246af60b7fceebb0cdde761aee89a714
Author: Stephan Ewen 
Date:   2016-08-31T11:59:01Z

[FLINK-4490] [distributed coordination] (part 3) Rename methods on 
'Instance' to have more intuitive names

getResourceID() --> getTaskManagerID()
getInstanceConnectionInfo() --> getTaskManagerLocation()




> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Kurt Young
> Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

2016-08-25 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437453#comment-15437453
 ] 

Stephan Ewen commented on FLINK-4490:
-

For the FLIP-6 work, we need that agreed.
Here is an idea of how we can make this hopefully without a lot of additional 
work:

(1) We introduce an abstract class {{Slot}} class that defines the methods that 
the {{ExecutionGraph}} and {{JobManager}} need, like
- fail()
- release()
- return()
- getTaskManagerGateway()

The {{SlotProvider}} will expose the new {{Slot}}.

(2) We implement a version of that slot that internally contains the current 
master's slot, to keep this compatible in the master, and we change the 
ExecutionGraph to work on the new {{Slot}}. The scheduler wraps the old slot to 
the new slot.

(3) We can then implement the FLIP-6 specific variant of the new slot.

> Decouple Slot and Instance
> --
>
> Key: FLINK-4490
> URL: https://issues.apache.org/jira/browse/FLINK-4490
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Reporter: Kurt Young
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} 
> holding {{Slot}}, it makes sense because it reflects how many resources it 
> can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it 
> belongs to. It only needs to hold some connection information and gateway to 
> talk to. Another downside for {{Slot}} holding {{Instance}} is that 
> {{Instance}} actually contains some allocate/de-allocation logicals, it will 
> be difficult if we want to do some allocation refactor without letting 
> {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} 
> holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of 
> instance's akka gateway, maybe we can just adding the akka gateway to the 
> {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)