[FLINK-4339] [cluster management] Implement Slot Pool core on JobManager side


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48c936ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48c936ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48c936ee

Branch: refs/heads/flip-6
Commit: 48c936eeda7b265f32bbf9f14d15f78eae15c06f
Parents: 34fef47
Author: Kurt Young <ykt...@gmail.com>
Authored: Thu Oct 13 04:59:46 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 17:38:17 2016 +0200

----------------------------------------------------------------------
 .../CheckpointCoordinatorGateway.java           |   2 -
 .../clusterframework/types/ResourceProfile.java |   8 +
 .../runtime/clusterframework/types/SlotID.java  |  16 +-
 .../flink/runtime/instance/SlotDescriptor.java  | 161 +++++
 .../apache/flink/runtime/instance/SlotPool.java | 675 +++++++++++++++++++
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   4 +-
 .../runtime/instance/AllocatedSlotsTest.java    | 135 ++++
 .../runtime/instance/AvailableSlotsTest.java    | 123 ++++
 .../flink/runtime/instance/SlotPoolTest.java    | 297 ++++++++
 9 files changed, 1411 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 196ef5c..fa09123 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -23,8 +23,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 
-import java.util.UUID;
-
 public interface CheckpointCoordinatorGateway extends RpcGateway {
 
        void acknowledgeCheckpoint(

http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index fa3aabc..1d8075e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -70,4 +70,12 @@ public class ResourceProfile implements Serializable {
        public boolean isMatching(ResourceProfile required) {
                return cpuCores >= required.getCpuCores() && memoryInMB >= 
required.getMemoryInMB();
        }
+
+       @Override
+       public String toString() {
+               return "ResourceProfile{" +
+                       "cpuCores=" + cpuCores +
+                       ", memoryInMB=" + memoryInMB +
+                       '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index e831a5d..237597b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -33,11 +33,11 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
        private final ResourceID resourceId;
 
        /** The numeric id for single slot */
-       private final int slotId;
+       private final int slotNumber;
 
-       public SlotID(ResourceID resourceId, int slotId) {
+       public SlotID(ResourceID resourceId, int slotNumber) {
                this.resourceId = checkNotNull(resourceId, "ResourceID must not 
be null");
-               this.slotId = slotId;
+               this.slotNumber = slotNumber;
        }
 
        // 
------------------------------------------------------------------------
@@ -47,6 +47,10 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
                return resourceId;
        }
 
+       public int getSlotNumber() {
+               return slotNumber;
+       }
+
        // 
------------------------------------------------------------------------
 
        @Override
@@ -60,7 +64,7 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
 
                SlotID slotID = (SlotID) o;
 
-               if (slotId != slotID.slotId) {
+               if (slotNumber != slotID.slotNumber) {
                        return false;
                }
                return resourceId.equals(slotID.resourceId);
@@ -69,13 +73,13 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
        @Override
        public int hashCode() {
                int result = resourceId.hashCode();
-               result = 31 * result + slotId;
+               result = 31 * result + slotNumber;
                return result;
        }
 
        @Override
        public String toString() {
-               return resourceId + "_" + slotId;
+               return resourceId + "_" + slotNumber;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
new file mode 100644
index 0000000..be7cf96
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The description of slots, TaskManagers offer one or more task slots, which 
define a slice of
+ * their resources. This description will contain some static information 
about the slot, such
+ * as the location and numeric id of the slot, rpc gateway to communicate with 
the TaskManager which
+ * owns the slot.
+ */
+public class SlotDescriptor {
+
+       /** The ID of the job this slice belongs to. */
+       private final JobID jobID;
+
+       /** The location information of the TaskManager to which this slot 
belongs */
+       private final TaskManagerLocation taskManagerLocation;
+
+       /** The number of the slot on which the task is deployed */
+       private final int slotNumber;
+
+       /** The resource profile of the slot provides */
+       private final ResourceProfile resourceProfile;
+
+       /** TEMP until the new RPC is in place: The actor gateway to 
communicate with the TaskManager */
+       private final ActorGateway taskManagerActorGateway;
+
+       public SlotDescriptor(
+               final JobID jobID,
+               final TaskManagerLocation location,
+               final int slotNumber,
+               final ResourceProfile resourceProfile,
+               final ActorGateway actorGateway)
+       {
+               this.jobID = checkNotNull(jobID);
+               this.taskManagerLocation = checkNotNull(location);
+               this.slotNumber = slotNumber;
+               this.resourceProfile = checkNotNull(resourceProfile);
+               this.taskManagerActorGateway = checkNotNull(actorGateway);
+       }
+
+       public SlotDescriptor(final SlotDescriptor other) {
+               this.jobID = other.jobID;
+               this.taskManagerLocation = other.taskManagerLocation;
+               this.slotNumber = other.slotNumber;
+               this.resourceProfile = other.resourceProfile;
+               this.taskManagerActorGateway = other.taskManagerActorGateway;
+       }
+       
+       // TODO - temporary workaround until we have the SlotDesriptor in the 
Slot
+       public SlotDescriptor(final Slot slot) {
+               this.jobID = slot.getJobID();
+               this.taskManagerLocation = slot.getTaskManagerLocation();
+               this.slotNumber = slot.getRootSlotNumber();
+               this.resourceProfile = new ResourceProfile(0, 0);
+               this.taskManagerActorGateway = 
slot.getTaskManagerActorGateway();
+       }
+
+       /**
+        * Returns the ID of the job this allocated slot belongs to.
+        *
+        * @return the ID of the job this allocated slot belongs to
+        */
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       /**
+        * Gets the number of the slot.
+        *
+        * @return The number of the slot on the TaskManager.
+        */
+       public int getSlotNumber() {
+               return slotNumber;
+       }
+
+       /**
+        * Gets the resource profile of the slot.
+        *
+        * @return The resource profile of the slot.
+        */
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
+
+       /**
+        * Gets the location info of the TaskManager that offers this slot.
+        *
+        * @return The location info of the TaskManager that offers this slot
+        */
+       public TaskManagerLocation getTaskManagerLocation() {
+               return taskManagerLocation;
+       }
+
+       /**
+        * Gets the actor gateway that can be used to send messages to the 
TaskManager.
+        * <p>
+        * This method should be removed once the new interface-based RPC 
abstraction is in place
+        *
+        * @return The actor gateway that can be used to send messages to the 
TaskManager.
+        */
+       public ActorGateway getTaskManagerActorGateway() {
+               return taskManagerActorGateway;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               SlotDescriptor that = (SlotDescriptor) o;
+
+               if (slotNumber != that.slotNumber) {
+                       return false;
+               }
+               if (!jobID.equals(that.jobID)) {
+                       return false;
+               }
+               return taskManagerLocation.equals(that.taskManagerLocation);
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = jobID.hashCode();
+               result = 31 * result + taskManagerLocation.hashCode();
+               result = 31 * result + slotNumber;
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return taskManagerLocation + " - " + slotNumber;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
new file mode 100644
index 0000000..e7857c1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
+import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The slot pool serves slot request issued by Scheduler or ExecutionGraph. It 
will will attempt to acquire new slots
+ * from the ResourceManager when it cannot serve a slot request. If no 
ResourceManager is currently available,
+ * or it gets a decline from the ResourceManager, or a request times out, it 
fails the slot request. The slot pool also
+ * holds all the slots that were offered to it and accepted, and can thus 
provides registered free slots even if the
+ * ResourceManager is down. The slots will only be released when they are 
useless, e.g. when the job is fully running
+ * but we still have some free slots.
+ * <p>
+ * All the allocation or the slot offering will be identified by self 
generated AllocationID, we will use it to
+ * eliminate ambiguities.
+ */
+public class SlotPool implements SlotOwner {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SlotPool.class);
+
+       private final Object lock = new Object();
+
+       /** The executor which is used to execute futures */
+       private final Executor executor;
+
+       /** All registered resources, slots will be accepted and used only if 
the resource is registered */
+       private final Set<ResourceID> registeredResources;
+
+       /** The book-keeping of all allocated slots */
+       private final AllocatedSlots allocatedSlots;
+
+       /** The book-keeping of all available slots */
+       private final AvailableSlots availableSlots;
+
+       /** All pending requests waiting for slots */
+       private final Map<AllocationID, Tuple2<SlotRequest, 
FlinkCompletableFuture<SlotDescriptor>>> pendingRequests;
+
+       /** Timeout of slot allocation */
+       private final Time timeout;
+
+       /** the leader id of job manager */
+       private UUID jobManagerLeaderId;
+
+       /** The leader id of resource manager */
+       private UUID resourceManagerLeaderId;
+
+       /** The gateway to communicate with resource manager */
+       private ResourceManagerGateway resourceManagerGateway;
+
+       public SlotPool(final Executor executor) {
+               this.executor = executor;
+               this.registeredResources = new HashSet<>();
+               this.allocatedSlots = new AllocatedSlots();
+               this.availableSlots = new AvailableSlots();
+               this.pendingRequests = new HashMap<>();
+               this.timeout = Time.of(5, TimeUnit.SECONDS);
+       }
+
+       public void setJobManagerLeaderId(final UUID jobManagerLeaderId) {
+               this.jobManagerLeaderId = jobManagerLeaderId;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Slot Allocation
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Try to allocate a simple slot with specified resource profile.
+        *
+        * @param jobID           The job id which the slot allocated for
+        * @param resourceProfile The needed resource profile
+        * @return The future of allocated simple slot
+        */
+       public Future<SimpleSlot> allocateSimpleSlot(final JobID jobID, final 
ResourceProfile resourceProfile) {
+               return allocateSimpleSlot(jobID, resourceProfile, new 
AllocationID());
+       }
+
+
+       /**
+        * Try to allocate a simple slot with specified resource profile and 
specified allocation id. It's mainly
+        * for testing purpose since we need to specify whatever allocation id 
we want.
+        */
+       @VisibleForTesting
+       Future<SimpleSlot> allocateSimpleSlot(
+               final JobID jobID,
+               final ResourceProfile resourceProfile,
+               final AllocationID allocationID)
+       {
+               final FlinkCompletableFuture<SlotDescriptor> future = new 
FlinkCompletableFuture<>();
+
+               internalAllocateSlot(jobID, allocationID, resourceProfile, 
future);
+
+               final SlotOwner owner = this;
+               return future.thenApplyAsync(
+                       new ApplyFunction<SlotDescriptor, SimpleSlot>() {
+                               @Override
+                               public SimpleSlot apply(SlotDescriptor 
descriptor) {
+                                       SimpleSlot slot = new SimpleSlot(
+                                                       descriptor.getJobID(), 
SlotPool.this,
+                                                       
descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(),
+                                                       
descriptor.getTaskManagerActorGateway());
+                                       synchronized (lock) {
+                                               // double validation since we 
are out of the lock protection after the slot is granted
+                                               if 
(registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID()))
 {
+                                                       
LOG.info("Allocation[{}] Allocated simple slot: {} for job {}.", allocationID, 
slot, jobID);
+                                                       
allocatedSlots.add(allocationID, descriptor, slot);
+                                               }
+                                               else {
+                                                       throw new 
RuntimeException("Resource was marked dead asynchronously.");
+                                               }
+                                       }
+                                       return slot;
+                               }
+                       },
+                       executor
+               );
+       }
+
+
+       /**
+        * Try to allocate a shared slot with specified resource profile.
+        *
+        * @param jobID                  The job id which the slot allocated for
+        * @param resourceProfile        The needed resource profile
+        * @param sharingGroupAssignment The slot sharing group of the vertex
+        * @return The future of allocated shared slot
+        */
+       public Future<SharedSlot> allocateSharedSlot(
+               final JobID jobID,
+               final ResourceProfile resourceProfile,
+               final SlotSharingGroupAssignment sharingGroupAssignment)
+       {
+               return allocateSharedSlot(jobID, resourceProfile, 
sharingGroupAssignment, new AllocationID());
+       }
+
+       /**
+        * Try to allocate a shared slot with specified resource profile and 
specified allocation id. It's mainly
+        * for testing purpose since we need to specify whatever allocation id 
we want.
+        */
+       @VisibleForTesting
+       Future<SharedSlot> allocateSharedSlot(
+               final JobID jobID,
+               final ResourceProfile resourceProfile,
+               final SlotSharingGroupAssignment sharingGroupAssignment,
+               final AllocationID allocationID)
+       {
+               final FlinkCompletableFuture<SlotDescriptor> future = new 
FlinkCompletableFuture<>();
+
+               internalAllocateSlot(jobID, allocationID, resourceProfile, 
future);
+
+               return future.thenApplyAsync(
+                       new ApplyFunction<SlotDescriptor, SharedSlot>() {
+                               @Override
+                               public SharedSlot apply(SlotDescriptor 
descriptor) {
+                                       SharedSlot slot = new SharedSlot(
+                                                       descriptor.getJobID(), 
SlotPool.this, descriptor.getTaskManagerLocation(),
+                                                       
descriptor.getSlotNumber(), descriptor.getTaskManagerActorGateway(),
+                                                       sharingGroupAssignment);
+
+                                       synchronized (lock) {
+                                               // double validation since we 
are out of the lock protection after the slot is granted
+                                               if 
(registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID()))
 {
+                                                       
LOG.info("Allocation[{}] Allocated shared slot: {} for job {}.", allocationID, 
slot, jobID);
+                                                       
allocatedSlots.add(allocationID, descriptor, slot);
+                                               }
+                                               else {
+                                                       throw new 
RuntimeException("Resource was marked dead asynchronously.");
+                                               }
+                                       }
+                                       return slot;
+                               }
+                       },
+                       executor
+               );
+       }
+
+       /**
+        * Internally allocate the slot with specified resource profile. We 
will first check whether we have some
+        * free slot which can meet the requirement already and allocate it 
immediately. Otherwise, we will try to
+        * allocation the slot from resource manager.
+        */
+       private void internalAllocateSlot(
+               final JobID jobID,
+               final AllocationID allocationID,
+               final ResourceProfile resourceProfile,
+               final FlinkCompletableFuture<SlotDescriptor> future)
+       {
+               LOG.info("Allocation[{}] Allocating slot with {} for Job {}.", 
allocationID, resourceProfile, jobID);
+
+               synchronized (lock) {
+                       // check whether we have any free slot which can match 
the required resource profile
+                       SlotDescriptor freeSlot = 
availableSlots.poll(resourceProfile);
+                       if (freeSlot != null) {
+                               future.complete(freeSlot);
+                       }
+                       else {
+                               if (resourceManagerGateway != null) {
+                                       LOG.info("Allocation[{}] No available 
slot exists, trying to allocate from resource manager.",
+                                               allocationID);
+                                       SlotRequest slotRequest = new 
SlotRequest(jobID, allocationID, resourceProfile);
+                                       pendingRequests.put(allocationID, new 
Tuple2<>(slotRequest, future));
+                                       
resourceManagerGateway.requestSlot(jobManagerLeaderId, resourceManagerLeaderId, 
slotRequest, timeout)
+                                               .handleAsync(new 
BiFunction<RMSlotRequestReply, Throwable, Void>() {
+                                                       @Override
+                                                       public Void 
apply(RMSlotRequestReply slotRequestReply, Throwable throwable) {
+                                                               if (throwable 
!= null) {
+                                                                       
future.completeExceptionally(
+                                                                               
new Exception("Slot allocation from resource manager failed", throwable));
+                                                               } else if 
(slotRequestReply instanceof RMSlotRequestRejected) {
+                                                                       
future.completeExceptionally(
+                                                                               
new Exception("Slot allocation rejected by resource manager"));
+                                                               }
+                                                               return null;
+                                                       }
+                                               }, executor);
+                               }
+                               else {
+                                       LOG.warn("Allocation[{}] Resource 
manager not available right now.", allocationID);
+                                       future.completeExceptionally(new 
Exception("Resource manager not available right now."));
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Slot De-allocation
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Return the slot back to this pool without releasing it. It's mainly 
called by failed / cancelled tasks, and the
+        * slot can be reused by other pending requests if the resource profile 
matches.n
+        *
+        * @param slot The slot needs to be returned
+        * @return True if the returning slot been accepted
+        */
+       @Override
+       public boolean returnAllocatedSlot(Slot slot) {
+               checkNotNull(slot);
+               checkArgument(!slot.isAlive(), "slot is still alive");
+               checkArgument(slot.getOwner() == this, "slot belongs to the 
wrong pool.");
+
+               if (slot.markReleased()) {
+                       synchronized (lock) {
+                               final SlotDescriptor slotDescriptor = 
allocatedSlots.remove(slot);
+                               if (slotDescriptor != null) {
+                                       // check if this TaskManager is valid
+                                       if 
(!registeredResources.contains(slot.getTaskManagerID())) {
+                                               return false;
+                                       }
+
+                                       final 
FlinkCompletableFuture<SlotDescriptor> pendingRequest = 
pollPendingRequest(slotDescriptor);
+                                       if (pendingRequest != null) {
+                                               
pendingRequest.complete(slotDescriptor);
+                                       }
+                                       else {
+                                               
availableSlots.add(slotDescriptor);
+                                       }
+
+                                       return true;
+                               }
+                               else {
+                                       throw new 
IllegalArgumentException("Slot was not allocated from this pool.");
+                               }
+                       }
+               }
+               else {
+                       return false;
+               }
+       }
+
+       private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final 
SlotDescriptor slotDescriptor) {
+               for (Map.Entry<AllocationID, Tuple2<SlotRequest, 
FlinkCompletableFuture<SlotDescriptor>>> entry : pendingRequests.entrySet()) {
+                       final Tuple2<SlotRequest, 
FlinkCompletableFuture<SlotDescriptor>> pendingRequest = entry.getValue();
+                       if 
(slotDescriptor.getResourceProfile().isMatching(pendingRequest.f0.getResourceProfile()))
 {
+                               pendingRequests.remove(entry.getKey());
+                               return pendingRequest.f1;
+                       }
+               }
+               return null;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Slot Releasing
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Release slot to TaskManager, called for finished tasks or canceled 
jobs.
+        *
+        * @param slot The slot needs to be released.
+        */
+       public void releaseSlot(final Slot slot) {
+               synchronized (lock) {
+                       allocatedSlots.remove(slot);
+                       availableSlots.remove(new SlotDescriptor(slot));
+                       // TODO: send release request to task manager
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Slot Offering
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Slot offering by TaskManager with AllocationID. The AllocationID is 
originally generated by this pool and
+        * transfer through the ResourceManager to TaskManager. We use it to 
distinguish the different allocation
+        * we issued. Slot offering may be rejected if we find something 
mismatching or there is actually no pending
+        * request waiting for this slot (maybe fulfilled by some other 
returned slot).
+        *
+        * @param allocationID   The allocation id of the lo
+        * @param slotDescriptor The offered slot descriptor
+        * @return True if we accept the offering
+        */
+       public boolean offerSlot(final AllocationID allocationID, final 
SlotDescriptor slotDescriptor) {
+               synchronized (lock) {
+                       // check if this TaskManager is valid
+                       final ResourceID resourceID = 
slotDescriptor.getTaskManagerLocation().getResourceID();
+                       if (!registeredResources.contains(resourceID)) {
+                               LOG.warn("Allocation[{}] Slot offering from 
unregistered TaskManager: {}",
+                                       allocationID, slotDescriptor);
+                               return false;
+                       }
+
+                       // check whether we have already using this slot
+                       final Slot allocatedSlot = 
allocatedSlots.get(allocationID);
+                       if (allocatedSlot != null) {
+                               final SlotDescriptor allocatedSlotDescriptor = 
new SlotDescriptor(allocatedSlot);
+
+                               if 
(allocatedSlotDescriptor.equals(slotDescriptor)) {
+                                       LOG.debug("Allocation[{}] Duplicated 
slot offering: {}",
+                                               allocationID, slotDescriptor);
+                                       return true;
+                               }
+                               else {
+                                       LOG.info("Allocation[{}] Allocation had 
been fulfilled by slot {}, rejecting offered slot {}",
+                                               allocationID, 
allocatedSlotDescriptor, slotDescriptor);
+                                       return false;
+                               }
+                       }
+
+                       // check whether we already have this slot in free pool
+                       if (availableSlots.contains(slotDescriptor)) {
+                               LOG.debug("Allocation[{}] Duplicated slot 
offering: {}",
+                                       allocationID, slotDescriptor);
+                               return true;
+                       }
+
+                       // check whether we have request waiting for this slot
+                       if (pendingRequests.containsKey(allocationID)) {
+                               FlinkCompletableFuture<SlotDescriptor> future = 
pendingRequests.remove(allocationID).f1;
+                               future.complete(slotDescriptor);
+                               return true;
+                       }
+
+                       // unwanted slot, rejecting this offer
+                       return false;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Resource
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Register TaskManager to this pool, only those slots come from 
registered TaskManager will be considered valid.
+        * Also it provides a way for us to keep "dead" or "abnormal" 
TaskManagers out of this pool.
+        *
+        * @param resourceID The id of the TaskManager
+        */
+       public void registerResource(final ResourceID resourceID) {
+               synchronized (lock) {
+                       registeredResources.add(resourceID);
+               }
+       }
+
+       /**
+        * Unregister TaskManager from this pool, all the related slots will be 
released and tasks be canceled. Called
+        * when we find some TaskManager becomes "dead" or "abnormal", and we 
decide to not using slots from it anymore.
+        *
+        * @param resourceID The id of the TaskManager
+        */
+       public void releaseResource(final ResourceID resourceID) {
+               synchronized (lock) {
+                       registeredResources.remove(resourceID);
+                       availableSlots.removeByResource(resourceID);
+
+                       final Set<Slot> allocatedSlotsForResource = 
allocatedSlots.getSlotsByResource(resourceID);
+                       for (Slot slot : allocatedSlotsForResource) {
+                               slot.releaseSlot();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  ResourceManager
+       // 
------------------------------------------------------------------------
+
+       public void setResourceManager(
+               final UUID resourceManagerLeaderId,
+               final ResourceManagerGateway resourceManagerGateway)
+       {
+               synchronized (lock) {
+                       this.resourceManagerLeaderId = resourceManagerLeaderId;
+                       this.resourceManagerGateway = resourceManagerGateway;
+               }
+       }
+
+       public void disconnectResourceManager() {
+               synchronized (lock) {
+                       this.resourceManagerLeaderId = null;
+                       this.resourceManagerGateway = null;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Helper classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Organize allocated slots from different points of view.
+        */
+       static class AllocatedSlots {
+
+               /** All allocated slots organized by TaskManager */
+               private final Map<ResourceID, Set<Slot>> 
allocatedSlotsByResource;
+
+               /** All allocated slots organized by Slot object */
+               private final Map<Slot, AllocationID> allocatedSlots;
+
+               private final Map<Slot, SlotDescriptor> 
allocatedSlotsWithDescriptor;
+
+               /** All allocated slots organized by AllocationID */
+               private final Map<AllocationID, Slot> allocatedSlotsById;
+
+               AllocatedSlots() {
+                       this.allocatedSlotsByResource = new HashMap<>();
+                       this.allocatedSlots = new HashMap<>();
+                       this.allocatedSlotsWithDescriptor = new HashMap<>();
+                       this.allocatedSlotsById = new HashMap<>();
+               }
+
+               /**
+                * Add a new allocation
+                *
+                * @param allocationID The allocation id
+                * @param slot         The allocated slot
+                */
+               void add(final AllocationID allocationID, final SlotDescriptor 
descriptor, final Slot slot) {
+                       allocatedSlots.put(slot, allocationID);
+                       allocatedSlotsById.put(allocationID, slot);
+                       allocatedSlotsWithDescriptor.put(slot, descriptor);
+
+                       final ResourceID resourceID = slot.getTaskManagerID();
+                       Set<Slot> slotsForResource = 
allocatedSlotsByResource.get(resourceID);
+                       if (slotsForResource == null) {
+                               slotsForResource = new HashSet<>();
+                               allocatedSlotsByResource.put(resourceID, 
slotsForResource);
+                       }
+                       slotsForResource.add(slot);
+               }
+
+               /**
+                * Get allocated slot with allocation id
+                *
+                * @param allocationID The allocation id
+                * @return The allocated slot, null if we can't find a match
+                */
+               Slot get(final AllocationID allocationID) {
+                       return allocatedSlotsById.get(allocationID);
+               }
+
+               /**
+                * Check whether we have allocated this slot
+                *
+                * @param slot The slot needs to checked
+                * @return True if we contains this slot
+                */
+               boolean contains(final Slot slot) {
+                       return allocatedSlots.containsKey(slot);
+               }
+
+               /**
+                * Remove an allocation with slot.
+                *
+                * @param slot The slot needs to be removed
+                */
+               SlotDescriptor remove(final Slot slot) {
+                       final SlotDescriptor descriptor = 
allocatedSlotsWithDescriptor.remove(slot);
+                       if (descriptor != null) {
+                               final AllocationID allocationID = 
allocatedSlots.remove(slot);
+                               if (allocationID != null) {
+                                       allocatedSlotsById.remove(allocationID);
+                               } else {
+                                       throw new IllegalStateException("Bug: 
maps are inconsistent");
+                               }
+
+                               final ResourceID resourceID = 
slot.getTaskManagerID();
+                               final Set<Slot> slotsForResource = 
allocatedSlotsByResource.get(resourceID);
+                               slotsForResource.remove(slot);
+                               if (slotsForResource.isEmpty()) {
+                                       
allocatedSlotsByResource.remove(resourceID);
+                               }
+                               
+                               return descriptor;
+                       } else {
+                               return null;
+                       }
+               }
+
+               /**
+                * Get all allocated slot from same TaskManager.
+                *
+                * @param resourceID The id of the TaskManager
+                * @return Set of slots which are allocated from the same 
TaskManager
+                */
+               Set<Slot> getSlotsByResource(final ResourceID resourceID) {
+                       Set<Slot> slotsForResource = 
allocatedSlotsByResource.get(resourceID);
+                       if (slotsForResource != null) {
+                               return new HashSet<>(slotsForResource);
+                       }
+                       else {
+                               return new HashSet<>();
+                       }
+               }
+
+               @VisibleForTesting
+               boolean containResource(final ResourceID resourceID) {
+                       return allocatedSlotsByResource.containsKey(resourceID);
+               }
+
+               @VisibleForTesting
+               int size() {
+                       return allocatedSlots.size();
+               }
+       }
+
+       /**
+        * Organize all available slots from different points of view.
+        */
+       static class AvailableSlots {
+
+               /** All available slots organized by TaskManager */
+               private final Map<ResourceID, Set<SlotDescriptor>> 
availableSlotsByResource;
+
+               /** All available slots */
+               private final Set<SlotDescriptor> availableSlots;
+
+               AvailableSlots() {
+                       this.availableSlotsByResource = new HashMap<>();
+                       this.availableSlots = new HashSet<>();
+               }
+
+               /**
+                * Add an available slot.
+                *
+                * @param descriptor The descriptor of the slot
+                */
+               void add(final SlotDescriptor descriptor) {
+                       availableSlots.add(descriptor);
+
+                       final ResourceID resourceID = 
descriptor.getTaskManagerLocation().getResourceID();
+                       Set<SlotDescriptor> slotsForResource = 
availableSlotsByResource.get(resourceID);
+                       if (slotsForResource == null) {
+                               slotsForResource = new HashSet<>();
+                               availableSlotsByResource.put(resourceID, 
slotsForResource);
+                       }
+                       slotsForResource.add(descriptor);
+               }
+
+               /**
+                * Check whether we have this slot
+                *
+                * @param slotDescriptor The descriptor of the slot
+                * @return True if we contains this slot
+                */
+               boolean contains(final SlotDescriptor slotDescriptor) {
+                       return availableSlots.contains(slotDescriptor);
+               }
+
+               /**
+                * Poll a slot which matches the required resource profile
+                *
+                * @param resourceProfile The required resource profile
+                * @return Slot which matches the resource profile, null if we 
can't find a match
+                */
+               SlotDescriptor poll(final ResourceProfile resourceProfile) {
+                       for (SlotDescriptor slotDescriptor : availableSlots) {
+                               if 
(slotDescriptor.getResourceProfile().isMatching(resourceProfile)) {
+                                       remove(slotDescriptor);
+                                       return slotDescriptor;
+                               }
+                       }
+                       return null;
+               }
+
+               /**
+                * Remove all available slots come from specified TaskManager.
+                *
+                * @param resourceID The id of the TaskManager
+                */
+               void removeByResource(final ResourceID resourceID) {
+                       final Set<SlotDescriptor> slotsForResource = 
availableSlotsByResource.remove(resourceID);
+                       if (slotsForResource != null) {
+                               for (SlotDescriptor slotDescriptor : 
slotsForResource) {
+                                       availableSlots.remove(slotDescriptor);
+                               }
+                       }
+               }
+
+               private void remove(final SlotDescriptor slotDescriptor) {
+                       availableSlots.remove(slotDescriptor);
+
+                       final ResourceID resourceID = 
slotDescriptor.getTaskManagerLocation().getResourceID();
+                       final Set<SlotDescriptor> slotsForResource = 
checkNotNull(availableSlotsByResource.get(resourceID));
+                       slotsForResource.remove(slotDescriptor);
+                       if (slotsForResource.isEmpty()) {
+                               availableSlotsByResource.remove(resourceID);
+                       }
+               }
+
+               @VisibleForTesting
+               boolean containResource(final ResourceID resourceID) {
+                       return availableSlotsByResource.containsKey(resourceID);
+               }
+
+               @VisibleForTesting
+               int size() {
+                       return availableSlots.size();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 9209d15..2461340 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -583,8 +583,8 @@ object AkkaUtils {
   }
 
   def formatDurationParingErrorMessage: String = {
-    "Duration format must be \"val unit\", where 'val' is a number and 'unit' 
is " + 
-      "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|"+
+    "Duration format must be \"val unit\", where 'val' is a number and 'unit' 
is " +
+      "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" +
       "(µs|micro|microsecond)|(ns|nano|nanosecond)"
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
new file mode 100644
index 0000000..655a3ea
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AllocatedSlotsTest {
+
+       @Test
+       public void testOperations() throws Exception {
+               SlotPool.AllocatedSlots allocatedSlots = new 
SlotPool.AllocatedSlots();
+
+               final AllocationID allocation1 = new AllocationID();
+               final ResourceID resource1 = new ResourceID("resource1");
+               final Slot slot1 = createSlot(resource1);
+
+               allocatedSlots.add(allocation1, new SlotDescriptor(slot1), 
slot1);
+
+               assertTrue(allocatedSlots.contains(slot1));
+               assertTrue(allocatedSlots.containResource(resource1));
+
+               assertEquals(slot1, allocatedSlots.get(allocation1));
+               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource1).size());
+               assertEquals(1, allocatedSlots.size());
+
+               final AllocationID allocation2 = new AllocationID();
+               final Slot slot2 = createSlot(resource1);
+
+               allocatedSlots.add(allocation2, new SlotDescriptor(slot2), 
slot2);
+
+               assertTrue(allocatedSlots.contains(slot1));
+               assertTrue(allocatedSlots.contains(slot2));
+               assertTrue(allocatedSlots.containResource(resource1));
+
+               assertEquals(slot1, allocatedSlots.get(allocation1));
+               assertEquals(slot2, allocatedSlots.get(allocation2));
+               assertEquals(2, 
allocatedSlots.getSlotsByResource(resource1).size());
+               assertEquals(2, allocatedSlots.size());
+
+               final AllocationID allocation3 = new AllocationID();
+               final ResourceID resource2 = new ResourceID("resource2");
+               final Slot slot3 = createSlot(resource2);
+
+               allocatedSlots.add(allocation3, new SlotDescriptor(slot2), 
slot3);
+
+               assertTrue(allocatedSlots.contains(slot1));
+               assertTrue(allocatedSlots.contains(slot2));
+               assertTrue(allocatedSlots.contains(slot3));
+               assertTrue(allocatedSlots.containResource(resource1));
+               assertTrue(allocatedSlots.containResource(resource2));
+
+               assertEquals(slot1, allocatedSlots.get(allocation1));
+               assertEquals(slot2, allocatedSlots.get(allocation2));
+               assertEquals(slot3, allocatedSlots.get(allocation3));
+               assertEquals(2, 
allocatedSlots.getSlotsByResource(resource1).size());
+               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
+               assertEquals(3, allocatedSlots.size());
+
+               allocatedSlots.remove(slot2);
+
+               assertTrue(allocatedSlots.contains(slot1));
+               assertFalse(allocatedSlots.contains(slot2));
+               assertTrue(allocatedSlots.contains(slot3));
+               assertTrue(allocatedSlots.containResource(resource1));
+               assertTrue(allocatedSlots.containResource(resource2));
+
+               assertEquals(slot1, allocatedSlots.get(allocation1));
+               assertNull(allocatedSlots.get(allocation2));
+               assertEquals(slot3, allocatedSlots.get(allocation3));
+               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource1).size());
+               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
+               assertEquals(2, allocatedSlots.size());
+
+               allocatedSlots.remove(slot1);
+
+               assertFalse(allocatedSlots.contains(slot1));
+               assertFalse(allocatedSlots.contains(slot2));
+               assertTrue(allocatedSlots.contains(slot3));
+               assertFalse(allocatedSlots.containResource(resource1));
+               assertTrue(allocatedSlots.containResource(resource2));
+
+               assertNull(allocatedSlots.get(allocation1));
+               assertNull(allocatedSlots.get(allocation2));
+               assertEquals(slot3, allocatedSlots.get(allocation3));
+               assertEquals(0, 
allocatedSlots.getSlotsByResource(resource1).size());
+               assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
+               assertEquals(1, allocatedSlots.size());
+
+               allocatedSlots.remove(slot3);
+
+               assertFalse(allocatedSlots.contains(slot1));
+               assertFalse(allocatedSlots.contains(slot2));
+               assertFalse(allocatedSlots.contains(slot3));
+               assertFalse(allocatedSlots.containResource(resource1));
+               assertFalse(allocatedSlots.containResource(resource2));
+
+               assertNull(allocatedSlots.get(allocation1));
+               assertNull(allocatedSlots.get(allocation2));
+               assertNull(allocatedSlots.get(allocation3));
+               assertEquals(0, 
allocatedSlots.getSlotsByResource(resource1).size());
+               assertEquals(0, 
allocatedSlots.getSlotsByResource(resource2).size());
+               assertEquals(0, allocatedSlots.size());
+       }
+
+       private Slot createSlot(final ResourceID resourceId) {
+               Slot slot = mock(Slot.class);
+               when(slot.getTaskManagerID()).thenReturn(resourceId);
+               return slot;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
new file mode 100644
index 0000000..872810f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AvailableSlotsTest {
+
+       static final ResourceProfile DEFAULT_TESTING_PROFILE = new 
ResourceProfile(1.0, 512);
+
+       static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new 
ResourceProfile(2.0, 1024);
+
+       @Test
+       public void testAddAndRemove() throws Exception {
+               SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
+
+               final ResourceID resource1 = new ResourceID("resource1");
+               final ResourceID resource2 = new ResourceID("resource2");
+
+               final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+               final SlotDescriptor slot2 = createSlotDescriptor(resource1);
+               final SlotDescriptor slot3 = createSlotDescriptor(resource2);
+
+               availableSlots.add(slot1);
+               availableSlots.add(slot2);
+               availableSlots.add(slot3);
+
+               assertEquals(3, availableSlots.size());
+               assertTrue(availableSlots.contains(slot1));
+               assertTrue(availableSlots.contains(slot2));
+               assertTrue(availableSlots.contains(slot3));
+               assertTrue(availableSlots.containResource(resource1));
+               assertTrue(availableSlots.containResource(resource2));
+
+               availableSlots.removeByResource(resource1);
+
+               assertEquals(1, availableSlots.size());
+               assertFalse(availableSlots.contains(slot1));
+               assertFalse(availableSlots.contains(slot2));
+               assertTrue(availableSlots.contains(slot3));
+               assertFalse(availableSlots.containResource(resource1));
+               assertTrue(availableSlots.containResource(resource2));
+
+               availableSlots.removeByResource(resource2);
+
+               assertEquals(0, availableSlots.size());
+               assertFalse(availableSlots.contains(slot1));
+               assertFalse(availableSlots.contains(slot2));
+               assertFalse(availableSlots.contains(slot3));
+               assertFalse(availableSlots.containResource(resource1));
+               assertFalse(availableSlots.containResource(resource2));
+       }
+
+       @Test
+       public void testPollFreeSlot() {
+               SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
+
+               final ResourceID resource1 = new ResourceID("resource1");
+               final SlotDescriptor slot1 = createSlotDescriptor(resource1);
+
+               availableSlots.add(slot1);
+
+               assertEquals(1, availableSlots.size());
+               assertTrue(availableSlots.contains(slot1));
+               assertTrue(availableSlots.containResource(resource1));
+
+               assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
+
+               assertEquals(slot1, 
availableSlots.poll(DEFAULT_TESTING_PROFILE));
+               assertEquals(0, availableSlots.size());
+               assertFalse(availableSlots.contains(slot1));
+               assertFalse(availableSlots.containResource(resource1));
+       }
+
+       static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) 
{
+               return createSlotDescriptor(resourceID, new JobID());
+       }
+
+       static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID) {
+               return createSlotDescriptor(resourceID, jobID, 
DEFAULT_TESTING_PROFILE);
+       }
+
+       static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID,
+               final ResourceProfile resourceProfile)
+       {
+               return createSlotDescriptor(resourceID, jobID, resourceProfile, 
0);
+       }
+
+       static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID,
+               final ResourceProfile resourceProfile, final int slotNumber)
+       {
+               TaskManagerLocation location = mock(TaskManagerLocation.class);
+               when(location.getResourceID()).thenReturn(resourceID);
+               return new SlotDescriptor(jobID, location, slotNumber, 
resourceProfile, mock(ActorGateway.class));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48c936ee/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
new file mode 100644
index 0000000..30cdbd6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SlotPoolTest extends TestLogger {
+
+       private Executor executor;
+
+       private SlotPool slotPool;
+
+       private ResourceManagerGateway resourceManagerGateway;
+
+       @Before
+       public void setUp() throws Exception {
+               this.executor = Executors.newFixedThreadPool(1);
+               this.slotPool = new SlotPool(executor);
+               this.resourceManagerGateway = 
mock(ResourceManagerGateway.class);
+               when(resourceManagerGateway
+                       .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class)))
+                       .thenReturn(mock(Future.class));
+               slotPool.setResourceManager(UUID.randomUUID(), 
resourceManagerGateway);
+               slotPool.setJobManagerLeaderId(UUID.randomUUID());
+       }
+
+       @After
+       public void tearDown() throws Exception {
+       }
+
+       @Test
+       public void testAllocateSimpleSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerResource(resourceID);
+
+               JobID jobID = new JobID();
+               AllocationID allocationID = new AllocationID();
+               Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID);
+               assertFalse(future.isDone());
+               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+
+               SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+               assertTrue(future.isDone());
+               assertTrue(slot.isAlive());
+               assertEquals(resourceID, slot.getTaskManagerID());
+               assertEquals(jobID, slot.getJobID());
+               assertEquals(slotPool, slot.getOwner());
+       }
+
+       @Test
+       public void testAllocateSharedSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerResource(resourceID);
+
+               JobVertexID vid = new JobVertexID();
+               SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
+               SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
+
+               JobID jobID = new JobID();
+               AllocationID allocationID = new AllocationID();
+               Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, 
DEFAULT_TESTING_PROFILE, assignment, allocationID);
+
+               assertFalse(future.isDone());
+               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+
+               SharedSlot slot = future.get(1, TimeUnit.SECONDS);
+               assertTrue(future.isDone());
+               assertTrue(slot.isAlive());
+               assertEquals(resourceID, slot.getTaskManagerID());
+               assertEquals(jobID, slot.getJobID());
+               assertEquals(slotPool, slot.getOwner());
+
+               SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
+               assertNotNull(simpleSlot);
+               assertTrue(simpleSlot.isAlive());
+       }
+
+       @Test
+       public void testAllocateSlotWithoutResourceManager() throws Exception {
+               slotPool.disconnectResourceManager();
+               Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new 
JobID(), DEFAULT_TESTING_PROFILE);
+               future.handleAsync(
+                       new BiFunction<SimpleSlot, Throwable, Void>() {
+                               @Override
+                               public Void apply(SimpleSlot simpleSlot, 
Throwable throwable) {
+                                       assertNull(simpleSlot);
+                                       assertNotNull(throwable);
+                                       return null;
+                               }
+                       },
+                       executor);
+               try {
+                       future.get(1, TimeUnit.SECONDS);
+                       fail("We expected a ExecutionException.");
+               } catch (ExecutionException ex) {
+                       // we expect the exception
+               }
+       }
+
+       @Test
+       public void testAllocationFulfilledByReturnedSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerResource(resourceID);
+
+               JobID jobID = new JobID();
+
+               AllocationID allocationID1 = new AllocationID();
+               Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
+
+               AllocationID allocationID2 = new AllocationID();
+               Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
+
+               assertFalse(future1.isDone());
+               assertFalse(future2.isDone());
+               verify(resourceManagerGateway, times(2))
+                       .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class));
+
+               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+               assertTrue(future1.isDone());
+               assertFalse(future2.isDone());
+
+               // return this slot to pool
+               slot1.releaseSlot();
+
+               // second allocation fulfilled by previous slot returning
+               SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+               assertTrue(future2.isDone());
+
+               assertNotEquals(slot1, slot2);
+               assertTrue(slot1.isReleased());
+               assertTrue(slot2.isAlive());
+               assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
+               assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+       }
+
+       @Test
+       public void testAllocateWithFreeSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerResource(resourceID);
+
+               JobID jobID = new JobID();
+               AllocationID allocationID1 = new AllocationID();
+               Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
+               assertFalse(future1.isDone());
+
+               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+               assertTrue(future1.isDone());
+
+               // return this slot to pool
+               slot1.releaseSlot();
+
+               AllocationID allocationID2 = new AllocationID();
+               Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
+
+               // second allocation fulfilled by previous slot returning
+               SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+               assertTrue(future2.isDone());
+
+               assertNotEquals(slot1, slot2);
+               assertTrue(slot1.isReleased());
+               assertTrue(slot2.isAlive());
+               assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
+               assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+       }
+
+       @Test
+       public void testOfferSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerResource(resourceID);
+
+               JobID jobID = new JobID();
+               AllocationID allocationID = new AllocationID();
+               Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID);
+               assertFalse(future.isDone());
+               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
+
+               // slot from unregistered resource
+               SlotDescriptor invalid = createSlotDescriptor(new 
ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
+               assertFalse(slotPool.offerSlot(allocationID, invalid));
+
+               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+
+               // reject offering with mismatch allocation id
+               assertFalse(slotPool.offerSlot(new AllocationID(), 
slotDescriptor));
+
+               // accepted slot
+               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+               SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+               assertTrue(future.isDone());
+               assertTrue(slot.isAlive());
+
+               // conflict offer with using slot
+               SlotDescriptor conflict = createSlotDescriptor(resourceID, 
jobID, DEFAULT_TESTING_PROFILE);
+               assertFalse(slotPool.offerSlot(allocationID, conflict));
+
+               // duplicated offer with using slot
+               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+               assertTrue(future.isDone());
+               assertTrue(slot.isAlive());
+
+               // duplicated offer with free slot
+               slot.releaseSlot();
+               assertTrue(slot.isReleased());
+               assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
+       }
+
+       @Test
+       public void testReleaseResource() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerResource(resourceID);
+
+               JobID jobID = new JobID();
+
+               AllocationID allocationID1 = new AllocationID();
+               Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
+
+               AllocationID allocationID2 = new AllocationID();
+               Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
+
+               SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
+
+               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+               assertTrue(future1.isDone());
+               assertFalse(future2.isDone());
+
+               slotPool.releaseResource(resourceID);
+               assertTrue(slot1.isReleased());
+
+               // slot released and not usable, second allocation still not 
fulfilled
+               Thread.sleep(10);
+               assertFalse(future2.isDone());
+       }
+
+}

Reply via email to