[FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of 
the rpc package

The TaskExecutor, the JobMaster and the ResourceManager were still contained in 
the rpc
package. With this commit, they will be moved out of this package. Now they are 
contained
in dedicated packages on the o.a.f.runtime level.

This closes #2438.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c247d1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c247d1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c247d1f

Branch: refs/heads/flip-6
Commit: 9c247d1ffd732d101145ef3ef6e8050151128f5c
Parents: 4b077af
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Aug 29 16:35:29 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:39 2016 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/SlotManager.java   | 525 ------------
 .../flink/runtime/jobmaster/JobMaster.java      | 244 ++++++
 .../runtime/jobmaster/JobMasterGateway.java     |  45 +
 .../registration/RegistrationResponse.java      |  84 ++
 .../registration/RetryingRegistration.java      | 296 +++++++
 .../resourcemanager/JobMasterRegistration.java  |  35 +
 .../resourcemanager/RegistrationResponse.java   |  43 +
 .../resourcemanager/ResourceManager.java        | 214 +++++
 .../resourcemanager/ResourceManagerGateway.java |  77 ++
 .../runtime/resourcemanager/SlotAssignment.java |  25 +
 .../runtime/resourcemanager/SlotManager.java    | 523 ++++++++++++
 .../runtime/resourcemanager/SlotRequest.java    |  74 ++
 .../flink/runtime/rpc/jobmaster/JobMaster.java  | 244 ------
 .../runtime/rpc/jobmaster/JobMasterGateway.java |  45 -
 .../rpc/registration/RegistrationResponse.java  |  84 --
 .../rpc/registration/RetryingRegistration.java  | 296 -------
 .../resourcemanager/JobMasterRegistration.java  |  35 -
 .../resourcemanager/RegistrationResponse.java   |  43 -
 .../rpc/resourcemanager/ResourceManager.java    | 214 -----
 .../resourcemanager/ResourceManagerGateway.java |  77 --
 .../rpc/resourcemanager/SlotAssignment.java     |  25 -
 .../rpc/resourcemanager/SlotRequest.java        |  74 --
 .../runtime/rpc/taskexecutor/SlotReport.java    |  56 --
 .../runtime/rpc/taskexecutor/SlotStatus.java    | 129 ---
 .../runtime/rpc/taskexecutor/TaskExecutor.java  | 827 -------------------
 .../taskexecutor/TaskExecutorConfiguration.java | 151 ----
 .../rpc/taskexecutor/TaskExecutorGateway.java   |  35 -
 .../TaskExecutorRegistrationSuccess.java        |  75 --
 ...TaskExecutorToResourceManagerConnection.java | 198 -----
 .../flink/runtime/taskexecutor/SlotReport.java  |  56 ++
 .../flink/runtime/taskexecutor/SlotStatus.java  | 129 +++
 .../runtime/taskexecutor/TaskExecutor.java      | 827 +++++++++++++++++++
 .../taskexecutor/TaskExecutorConfiguration.java | 151 ++++
 .../taskexecutor/TaskExecutorGateway.java       |  35 +
 .../TaskExecutorRegistrationSuccess.java        |  75 ++
 ...TaskExecutorToResourceManagerConnection.java | 198 +++++
 .../clusterframework/ClusterShutdownITCase.java | 156 ++++
 .../clusterframework/ResourceManagerITCase.java | 162 ++++
 .../clusterframework/ResourceManagerTest.java   | 338 ++++++++
 .../clusterframework/SlotManagerTest.java       | 540 ------------
 .../registration/RetryingRegistrationTest.java  | 336 ++++++++
 .../registration/TestRegistrationGateway.java   |  85 ++
 .../resourcemanager/ClusterShutdownITCase.java  | 156 ----
 .../resourcemanager/ResourceManagerHATest.java  |  76 ++
 .../resourcemanager/ResourceManagerITCase.java  | 162 ----
 .../resourcemanager/ResourceManagerTest.java    | 338 --------
 .../resourcemanager/SlotManagerTest.java        | 538 ++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  14 -
 .../registration/RetryingRegistrationTest.java  | 336 --------
 .../registration/TestRegistrationGateway.java   |  85 --
 .../resourcemanager/ResourceManagerHATest.java  |  76 --
 .../rpc/taskexecutor/TaskExecutorTest.java      | 117 ---
 .../runtime/taskexecutor/TaskExecutorTest.java  | 117 +++
 53 files changed, 4939 insertions(+), 4957 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
deleted file mode 100644
index cc140a1..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
+++ /dev/null
@@ -1,525 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
-import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation requests 
in case of there are not
- * enough available slots. Besides, it should sync its slot allocation with 
TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager, which 
is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free pool 
or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision based 
on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
- */
-public abstract class SlotManager {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
-
-       /** Gateway to communicate with ResourceManager */
-       private final ResourceManagerGateway resourceManagerGateway;
-
-       /** All registered slots, including free and allocated slots */
-       private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
-
-       /** All pending slot requests, waiting available slots to fulfil */
-       private final Map<AllocationID, SlotRequest> pendingSlotRequests;
-
-       /** All free slots that can be used to be allocated */
-       private final Map<SlotID, ResourceSlot> freeSlots;
-
-       /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
-       private final AllocationMap allocationMap;
-
-       public SlotManager(ResourceManagerGateway resourceManagerGateway) {
-               this.resourceManagerGateway = 
checkNotNull(resourceManagerGateway);
-               this.registeredSlots = new HashMap<>(16);
-               this.pendingSlotRequests = new LinkedHashMap<>(16);
-               this.freeSlots = new HashMap<>(16);
-               this.allocationMap = new AllocationMap();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  slot managements
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Request a slot with requirements, we may either fulfill the request 
or pending it. Trigger container
-        * allocation if we don't have enough resource. If we have free slot 
which can match the request, record
-        * this allocation and forward the request to TaskManager through 
ResourceManager (we want this done by
-        * RPC's main thread to avoid race condition).
-        *
-        * @param request The detailed request of the slot
-        */
-       public void requestSlot(final SlotRequest request) {
-               if (isRequestDuplicated(request)) {
-                       LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-                       return;
-               }
-
-               // try to fulfil the request with current free slots
-               ResourceSlot slot = chooseSlotToUse(request, freeSlots);
-               if (slot != null) {
-                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-                               request.getAllocationId(), request.getJobId());
-
-                       // record this allocation in bookkeeping
-                       allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
-
-                       // remove selected slot from free pool
-                       freeSlots.remove(slot.getSlotId());
-
-                       // TODO: send slot request to TaskManager
-               } else {
-                       LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
-                               "AllocationID:{}, JobID:{}", 
request.getAllocationId(), request.getJobId());
-                       allocateContainer(request.getResourceProfile());
-                       pendingSlotRequests.put(request.getAllocationId(), 
request);
-               }
-       }
-
-       /**
-        * Sync slot status with TaskManager's SlotReport.
-        */
-       public void updateSlotStatus(final SlotReport slotReport) {
-               for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-                       updateSlotStatus(slotStatus);
-               }
-       }
-
-       /**
-        * The slot request to TaskManager may be either failed by rpc 
communication (timeout, network error, etc.)
-        * or really rejected by TaskManager. We shall retry this request by:
-        * <ul>
-        * <li>1. verify and clear all the previous allocate information for 
this request
-        * <li>2. try to request slot again
-        * </ul>
-        * <p>
-        * This may cause some duplicate allocation, e.g. the slot request to 
TaskManager is successful but the response
-        * is lost somehow, so we may request a slot in another TaskManager, 
this causes two slots assigned to one request,
-        * but it can be taken care of by rejecting registration at JobManager.
-        *
-        * @param originalRequest The original slot request
-        * @param slotId          The target SlotID
-        */
-       public void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
-               final AllocationID originalAllocationId = 
originalRequest.getAllocationId();
-               LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
-                       slotId, originalAllocationId, 
originalRequest.getJobId());
-
-               // verify the allocation info before we do anything
-               if (freeSlots.containsKey(slotId)) {
-                       // this slot is currently empty, no need to de-allocate 
it from our allocations
-                       LOG.info("Original slot is somehow empty, retrying this 
request");
-
-                       // before retry, we should double check whether this 
request was allocated by some other ways
-                       if (!allocationMap.isAllocated(originalAllocationId)) {
-                               requestSlot(originalRequest);
-                       } else {
-                               LOG.info("The failed request has somehow been 
allocated, SlotID:{}",
-                                       
allocationMap.getSlotID(originalAllocationId));
-                       }
-               } else if (allocationMap.isAllocated(slotId)) {
-                       final AllocationID currentAllocationId = 
allocationMap.getAllocationID(slotId);
-
-                       // check whether we have an agreement on whom this slot 
belongs to
-                       if (originalAllocationId.equals(currentAllocationId)) {
-                               LOG.info("De-allocate this request and retry");
-                               
allocationMap.removeAllocation(currentAllocationId);
-
-                               // put this slot back to free pool
-                               ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
-                               freeSlots.put(slotId, slot);
-
-                               // retry the request
-                               requestSlot(originalRequest);
-                       } else {
-                               // the slot is taken by someone else, no need 
to de-allocate it from our allocations
-                               LOG.info("Original slot is taken by someone 
else, current AllocationID:{}", currentAllocationId);
-
-                               // before retry, we should double check whether 
this request was allocated by some other ways
-                               if 
(!allocationMap.isAllocated(originalAllocationId)) {
-                                       requestSlot(originalRequest);
-                               } else {
-                                       LOG.info("The failed request is somehow 
been allocated, SlotID:{}",
-                                               
allocationMap.getSlotID(originalAllocationId));
-                               }
-                       }
-               } else {
-                       LOG.error("BUG! {} is neither in free pool nor in 
allocated pool", slotId);
-               }
-       }
-
-       /**
-        * Callback for TaskManager failures. In case that a TaskManager fails, 
we have to clean up all its slots.
-        *
-        * @param resourceId The ResourceID of the TaskManager
-        */
-       public void notifyTaskManagerFailure(final ResourceID resourceId) {
-               LOG.info("Resource:{} been notified failure", resourceId);
-               final Map<SlotID, ResourceSlot> slotIdsToRemove = 
registeredSlots.remove(resourceId);
-               if (slotIdsToRemove != null) {
-                       for (SlotID slotId : slotIdsToRemove.keySet()) {
-                               LOG.info("Removing Slot:{} upon resource 
failure", slotId);
-                               if (freeSlots.containsKey(slotId)) {
-                                       freeSlots.remove(slotId);
-                               } else if (allocationMap.isAllocated(slotId)) {
-                                       allocationMap.removeAllocation(slotId);
-                               } else {
-                                       LOG.error("BUG! {} is neither in free 
pool nor in allocated pool", slotId);
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  internal behaviors
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Update slot status based on TaskManager's report. There are mainly 
two situations when we receive the report:
-        * <ul>
-        * <li>1. The slot is newly registered.</li>
-        * <li>2. The slot has registered, it contains its current status.</li>
-        * </ul>
-        * <p>
-        * Regarding 1: It's fairly simple, we just record this slot's status, 
and trigger schedule if slot is empty.
-        * <p>
-        * Regarding 2: It will cause some weird situation since we may have 
some time-gap on how the slot's status really
-        * is. We may have some updates on the slot's allocation, but it 
doesn't reflected by TaskManager's heartbeat yet,
-        * and we may make some wrong decision if we cannot guarantee we have 
the exact status about all the slots. So
-        * the principle here is: We always trust TaskManager's heartbeat, we 
will correct our information based on that
-        * and take next action based on the diff between our information and 
heartbeat status.
-        *
-        * @param reportedStatus Reported slot status
-        */
-       void updateSlotStatus(final SlotStatus reportedStatus) {
-               final SlotID slotId = reportedStatus.getSlotID();
-               final ResourceSlot slot = new ResourceSlot(slotId, 
reportedStatus.getProfiler());
-
-               if (registerNewSlot(slot)) {
-                       // we have a newly registered slot
-                       LOG.info("New slot appeared, SlotID:{}, 
AllocationID:{}", slotId, reportedStatus.getAllocationID());
-
-                       if (reportedStatus.getAllocationID() != null) {
-                               // slot in use, record this in bookkeeping
-                               allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
-                       } else {
-                               handleFreeSlot(new ResourceSlot(slotId, 
reportedStatus.getProfiler()));
-                       }
-               } else {
-                       // slot exists, update current information
-                       if (reportedStatus.getAllocationID() != null) {
-                               // slot is reported in use
-                               final AllocationID reportedAllocationId = 
reportedStatus.getAllocationID();
-
-                               // check whether we also thought this slot is 
in use
-                               if (allocationMap.isAllocated(slotId)) {
-                                       // we also think that slot is in use, 
check whether the AllocationID matches
-                                       final AllocationID currentAllocationId 
= allocationMap.getAllocationID(slotId);
-
-                                       if 
(!reportedAllocationId.equals(currentAllocationId)) {
-                                               LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:{}",
-                                                       slotId, 
currentAllocationId, reportedAllocationId);
-
-                                               // seems we have a disagreement 
about the slot assignments, need to correct it
-                                               
allocationMap.removeAllocation(slotId);
-                                               
allocationMap.addAllocation(slotId, reportedAllocationId);
-                                       }
-                               } else {
-                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:null, reported:{}",
-                                               slotId, reportedAllocationId);
-
-                                       // we thought the slot is free, should 
correct this information
-                                       allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
-
-                                       // remove this slot from free slots pool
-                                       freeSlots.remove(slotId);
-                               }
-                       } else {
-                               // slot is reported empty
-
-                               // check whether we also thought this slot is 
empty
-                               if (allocationMap.isAllocated(slotId)) {
-                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
-                                               slotId, 
allocationMap.getAllocationID(slotId));
-
-                                       // we thought the slot is in use, 
correct it
-                                       allocationMap.removeAllocation(slotId);
-
-                                       // we have a free slot!
-                                       handleFreeSlot(new ResourceSlot(slotId, 
reportedStatus.getProfiler()));
-                               }
-                       }
-               }
-       }
-
-       /**
-        * When we have a free slot, try to fulfill the pending request first. 
If any request can be fulfilled,
-        * record this allocation in bookkeeping and send slot request to 
TaskManager, else we just add this slot
-        * to the free pool.
-        *
-        * @param freeSlot The free slot
-        */
-       private void handleFreeSlot(final ResourceSlot freeSlot) {
-               SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, 
pendingSlotRequests);
-
-               if (chosenRequest != null) {
-                       
pendingSlotRequests.remove(chosenRequest.getAllocationId());
-
-                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", freeSlot.getSlotId(),
-                               chosenRequest.getAllocationId(), 
chosenRequest.getJobId());
-                       allocationMap.addAllocation(freeSlot.getSlotId(), 
chosenRequest.getAllocationId());
-
-                       // TODO: send slot request to TaskManager
-               } else {
-                       freeSlots.put(freeSlot.getSlotId(), freeSlot);
-               }
-       }
-
-       /**
-        * Check whether the request is duplicated. We use AllocationID to 
identify slot request, for each
-        * formerly received slot request, it is either in pending list or 
already been allocated.
-        *
-        * @param request The slot request
-        * @return <tt>true</tt> if the request is duplicated
-        */
-       private boolean isRequestDuplicated(final SlotRequest request) {
-               final AllocationID allocationId = request.getAllocationId();
-               return pendingSlotRequests.containsKey(allocationId)
-                       || allocationMap.isAllocated(allocationId);
-       }
-
-       /**
-        * Try to register slot, and tell if this slot is newly registered.
-        *
-        * @param slot The ResourceSlot which will be checked and registered
-        * @return <tt>true</tt> if we meet a new slot
-        */
-       private boolean registerNewSlot(final ResourceSlot slot) {
-               final SlotID slotId = slot.getSlotId();
-               final ResourceID resourceId = slotId.getResourceID();
-               if (!registeredSlots.containsKey(resourceId)) {
-                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
-               }
-               return registeredSlots.get(resourceId).put(slotId, slot) == 
null;
-       }
-
-       private ResourceSlot getRegisteredSlot(final SlotID slotId) {
-               final ResourceID resourceId = slotId.getResourceID();
-               if (!registeredSlots.containsKey(resourceId)) {
-                       return null;
-               }
-               return registeredSlots.get(resourceId).get(slotId);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Framework specific behavior
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Choose a slot to use among all free slots, the behavior is framework 
specified.
-        *
-        * @param request   The slot request
-        * @param freeSlots All slots which can be used
-        * @return The slot we choose to use, <tt>null</tt> if we did not find 
a match
-        */
-       protected abstract ResourceSlot chooseSlotToUse(final SlotRequest 
request,
-               final Map<SlotID, ResourceSlot> freeSlots);
-
-       /**
-        * Choose a pending request to fulfill when we have a free slot, the 
behavior is framework specified.
-        *
-        * @param offeredSlot     The free slot
-        * @param pendingRequests All the pending slot requests
-        * @return The chosen SlotRequest, <tt>null</tt> if we did not find a 
match
-        */
-       protected abstract SlotRequest chooseRequestToFulfill(final 
ResourceSlot offeredSlot,
-               final Map<AllocationID, SlotRequest> pendingRequests);
-
-       /**
-        * The framework specific code for allocating a container for specified 
resource profile.
-        *
-        * @param resourceProfile The resource profile
-        */
-       protected abstract void allocateContainer(final ResourceProfile 
resourceProfile);
-
-
-       // 
------------------------------------------------------------------------
-       //  Helper classes
-       // 
------------------------------------------------------------------------
-
-       /**
-        * We maintain all the allocations with SlotID and AllocationID. We are 
able to get or remove the allocation info
-        * either by SlotID or AllocationID.
-        */
-       private static class AllocationMap {
-
-               /** All allocated slots (by SlotID) */
-               private final Map<SlotID, AllocationID> allocatedSlots;
-
-               /** All allocated slots (by AllocationID), it'a a inverse view 
of allocatedSlots */
-               private final Map<AllocationID, SlotID> 
allocatedSlotsByAllocationId;
-
-               AllocationMap() {
-                       this.allocatedSlots = new HashMap<>(16);
-                       this.allocatedSlotsByAllocationId = new HashMap<>(16);
-               }
-
-               /**
-                * Add a allocation
-                *
-                * @param slotId       The slot id
-                * @param allocationId The allocation id
-                */
-               void addAllocation(final SlotID slotId, final AllocationID 
allocationId) {
-                       allocatedSlots.put(slotId, allocationId);
-                       allocatedSlotsByAllocationId.put(allocationId, slotId);
-               }
-
-               /**
-                * De-allocation with slot id
-                *
-                * @param slotId The slot id
-                */
-               void removeAllocation(final SlotID slotId) {
-                       if (allocatedSlots.containsKey(slotId)) {
-                               final AllocationID allocationId = 
allocatedSlots.get(slotId);
-                               allocatedSlots.remove(slotId);
-                               
allocatedSlotsByAllocationId.remove(allocationId);
-                       }
-               }
-
-               /**
-                * De-allocation with allocation id
-                *
-                * @param allocationId The allocation id
-                */
-               void removeAllocation(final AllocationID allocationId) {
-                       if 
(allocatedSlotsByAllocationId.containsKey(allocationId)) {
-                               SlotID slotId = 
allocatedSlotsByAllocationId.get(allocationId);
-                               
allocatedSlotsByAllocationId.remove(allocationId);
-                               allocatedSlots.remove(slotId);
-                       }
-               }
-
-               /**
-                * Check whether allocation exists by slot id
-                *
-                * @param slotId The slot id
-                * @return true if the allocation exists
-                */
-               boolean isAllocated(final SlotID slotId) {
-                       return allocatedSlots.containsKey(slotId);
-               }
-
-               /**
-                * Check whether allocation exists by allocation id
-                *
-                * @param allocationId The allocation id
-                * @return true if the allocation exists
-                */
-               boolean isAllocated(final AllocationID allocationId) {
-                       return 
allocatedSlotsByAllocationId.containsKey(allocationId);
-               }
-
-               AllocationID getAllocationID(final SlotID slotId) {
-                       return allocatedSlots.get(slotId);
-               }
-
-               SlotID getSlotID(final AllocationID allocationId) {
-                       return allocatedSlotsByAllocationId.get(allocationId);
-               }
-
-               public int size() {
-                       return allocatedSlots.size();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Testing utilities
-       // 
------------------------------------------------------------------------
-
-       @VisibleForTesting
-       boolean isAllocated(final SlotID slotId) {
-               return allocationMap.isAllocated(slotId);
-       }
-
-       @VisibleForTesting
-       boolean isAllocated(final AllocationID allocationId) {
-               return allocationMap.isAllocated(allocationId);
-       }
-
-       /**
-        * Add free slots directly to the free pool, this will not trigger 
pending requests allocation
-        *
-        * @param slot The resource slot
-        */
-       @VisibleForTesting
-       void addFreeSlot(final ResourceSlot slot) {
-               final ResourceID resourceId = slot.getResourceID();
-               final SlotID slotId = slot.getSlotId();
-
-               if (!registeredSlots.containsKey(resourceId)) {
-                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
-               }
-               registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
-               freeSlots.put(slotId, slot);
-       }
-
-       @VisibleForTesting
-       int getAllocatedSlotCount() {
-               return allocationMap.size();
-       }
-
-       @VisibleForTesting
-       int getFreeSlotCount() {
-               return freeSlots.size();
-       }
-
-       @VisibleForTesting
-       int getPendingRequestCount() {
-               return pendingSlotRequests.size();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
new file mode 100644
index 0000000..0a6a7ef
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+
+/**
+ * JobMaster implementation. The job master is responsible for the execution 
of a single
+ * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+ * <p>
+ * It offers the following methods as part of its rpc interface to interact 
with the JobMaster
+ * remotely:
+ * <ul>
+ *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the 
task execution state for
+ * given task</li>
+ * </ul>
+ */
+public class JobMaster extends RpcEndpoint<JobMasterGateway> {
+
+       /** Gateway to connected resource manager, null iff not connected */
+       private ResourceManagerGateway resourceManager = null;
+
+       /** Logical representation of the job */
+       private final JobGraph jobGraph;
+       private final JobID jobID;
+
+       /** Configuration of the job */
+       private final Configuration configuration;
+
+       /** Service to contend for and retrieve the leadership of JM and RM */
+       private final HighAvailabilityServices highAvailabilityServices;
+
+       /** Leader Management */
+       private LeaderElectionService leaderElectionService = null;
+       private UUID leaderSessionID;
+
+       /**
+        * The JM's Constructor
+        *
+        * @param jobGraph The representation of the job's execution plan
+        * @param configuration The job's configuration
+        * @param rpcService The RPC service at which the JM serves
+        * @param highAvailabilityService The cluster's HA service from the JM 
can elect and retrieve leaders.
+        */
+       public JobMaster(
+               JobGraph jobGraph,
+               Configuration configuration,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityService) {
+
+               super(rpcService);
+
+               this.jobGraph = Preconditions.checkNotNull(jobGraph);
+               this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
+
+               this.configuration = Preconditions.checkNotNull(configuration);
+
+               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityService);
+       }
+
+       public ResourceManagerGateway getResourceManager() {
+               return resourceManager;
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Initialization methods
+       
//----------------------------------------------------------------------------------------------
+       public void start() {
+               super.start();
+
+               // register at the election once the JM starts
+               registerAtElectionService();
+       }
+
+
+       
//----------------------------------------------------------------------------------------------
+       // JobMaster Leadership methods
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Retrieves the election service and contend for the leadership.
+        */
+       private void registerAtElectionService() {
+               try {
+                       leaderElectionService = 
highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
+                       leaderElectionService.start(new 
JobMasterLeaderContender());
+               } catch (Exception e) {
+                       throw new RuntimeException("Fail to register at the 
election of JobMaster", e);
+               }
+       }
+
+       /**
+        * Start the execution when the leadership is granted.
+        *
+        * @param newLeaderSessionID The identifier of the new leadership 
session
+        */
+       public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.info("JobManager {} grants leadership with 
session id {}.", getAddress(), newLeaderSessionID);
+
+                               // The operation may be blocking, but since JM 
is idle before it grants the leadership, it's okay that
+                               // JM waits here for the operation's 
completeness.
+                               leaderSessionID = newLeaderSessionID;
+                               
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+
+                               // TODO:: execute the job when the leadership 
is granted.
+                       }
+               });
+       }
+
+       /**
+        * Stop the execution when the leadership is revoked.
+        */
+       public void revokeJobMasterLeadership() {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.info("JobManager {} was revoked 
leadership.", getAddress());
+
+                               // TODO:: cancel the job's execution and notify 
all listeners
+                               cancelAndClearEverything(new 
Exception("JobManager is no longer the leader."));
+
+                               leaderSessionID = null;
+                       }
+               });
+       }
+
+       /**
+        * Handles error occurring in the leader election service
+        *
+        * @param exception Exception thrown in the leader election service
+        */
+       public void onJobMasterElectionError(final Exception exception) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.error("Received an error from the 
LeaderElectionService.", exception);
+
+                               // TODO:: cancel the job's execution and 
shutdown the JM
+                               cancelAndClearEverything(exception);
+
+                               leaderSessionID = null;
+                       }
+               });
+
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // RPC methods
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Updates the task execution state for a given task.
+        *
+        * @param taskExecutionState New task execution state for a given task
+        * @return Acknowledge the task execution state update
+        */
+       @RpcMethod
+       public Acknowledge updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+               System.out.println("TaskExecutionState: " + taskExecutionState);
+               return Acknowledge.get();
+       }
+
+       /**
+        * Triggers the registration of the job master at the resource manager.
+        *
+        * @param address Address of the resource manager
+        */
+       @RpcMethod
+       public void registerAtResourceManager(final String address) {
+               //TODO:: register at the RM
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Helper methods
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Cancel the current job and notify all listeners the job's 
cancellation.
+        *
+        * @param cause Cause for the cancelling.
+        */
+       private void cancelAndClearEverything(Throwable cause) {
+               // currently, nothing to do here
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utility classes
+       // 
------------------------------------------------------------------------
+       private class JobMasterLeaderContender implements LeaderContender {
+
+               @Override
+               public void grantLeadership(UUID leaderSessionID) {
+                       
JobMaster.this.grantJobMasterLeadership(leaderSessionID);
+               }
+
+               @Override
+               public void revokeLeadership() {
+                       JobMaster.this.revokeJobMasterLeadership();
+               }
+
+               @Override
+               public String getAddress() {
+                       return JobMaster.this.getAddress();
+               }
+
+               @Override
+               public void handleError(Exception exception) {
+                       onJobMasterElectionError(exception);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
new file mode 100644
index 0000000..a53e383
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+
+/**
+ * {@link JobMaster} rpc gateway interface
+ */
+public interface JobMasterGateway extends RpcGateway {
+
+       /**
+        * Updates the task execution state for a given task.
+        *
+        * @param taskExecutionState New task execution state for a given task
+        * @return Future acknowledge of the task execution state update
+        */
+       Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
+
+       /**
+        * Triggers the registration of the job master at the resource manager.
+        *
+        * @param address Address of the resource manager
+        */
+       void registerAtResourceManager(final String address);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java
new file mode 100644
index 0000000..fefcc78
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import java.io.Serializable;
+
+/**
+ * Base class for responses given to registration attempts from {@link 
RetryingRegistration}.
+ */
+public abstract class RegistrationResponse implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       // 
----------------------------------------------------------------------------
+       
+       /**
+        * Base class for a successful registration. Concrete registration 
implementations
+        * will typically extend this class to attach more information.
+        */
+       public static class Success extends RegistrationResponse {
+               private static final long serialVersionUID = 1L;
+               
+               @Override
+               public String toString() {
+                       return "Registration Successful";
+               }
+       }
+
+       // 
----------------------------------------------------------------------------
+
+       /**
+        * A rejected (declined) registration.
+        */
+       public static final class Decline extends RegistrationResponse {
+               private static final long serialVersionUID = 1L;
+
+               /** the rejection reason */
+               private final String reason;
+
+               /**
+                * Creates a new rejection message.
+                * 
+                * @param reason The reason for the rejection.
+                */
+               public Decline(String reason) {
+                       this.reason = reason != null ? reason : "(unknown)";
+               }
+
+               /**
+                * Gets the reason for the rejection.
+                */
+               public String getReason() {
+                       return reason;
+               }
+
+               @Override
+               public String toString() {
+                       return "Registration Declined (" + reason + ')';
+               }
+       }
+}
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
new file mode 100644
index 0000000..88fe9b5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.impl.Promise.DefaultPromise;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * This utility class implements the basis of registering one component at 
another component,
+ * for example registering the TaskExecutor at the ResourceManager.
+ * This {@code RetryingRegistration} implements both the initial address 
resolution
+ * and the retries-with-backoff strategy.
+ * 
+ * <p>The registration gives access to a future that is completed upon 
successful registration.
+ * The registration can be canceled, for example when the target where it 
tries to register
+ * at looses leader status.
+ * 
+ * @param <Gateway> The type of the gateway to connect to.
+ * @param <Success> The type of the successful registration responses.
+ */
+public abstract class RetryingRegistration<Gateway extends RpcGateway, Success 
extends RegistrationResponse.Success> {
+
+       // 
------------------------------------------------------------------------
+       //  default configuration values
+       // 
------------------------------------------------------------------------
+
+       /** default value for the initial registration timeout (milliseconds) */
+       private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
+
+       /** default value for the maximum registration timeout, after 
exponential back-off (milliseconds) */
+       private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
+
+       /** The pause (milliseconds) made after an registration attempt caused 
an exception (other than timeout) */
+       private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
+
+       /** The pause (milliseconds) made after the registration attempt was 
refused */
+       private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
+
+       // 
------------------------------------------------------------------------
+       // Fields
+       // 
------------------------------------------------------------------------
+
+       private final Logger log;
+
+       private final RpcService rpcService;
+
+       private final String targetName;
+
+       private final Class<Gateway> targetType;
+
+       private final String targetAddress;
+
+       private final UUID leaderId;
+
+       private final Promise<Tuple2<Gateway, Success>> completionPromise;
+
+       private final long initialRegistrationTimeout;
+
+       private final long maxRegistrationTimeout;
+
+       private final long delayOnError;
+
+       private final long delayOnRefusedRegistration;
+
+       private volatile boolean canceled;
+
+       // 
------------------------------------------------------------------------
+
+       public RetryingRegistration(
+                       Logger log,
+                       RpcService rpcService,
+                       String targetName,
+                       Class<Gateway> targetType,
+                       String targetAddress,
+                       UUID leaderId) {
+               this(log, rpcService, targetName, targetType, targetAddress, 
leaderId,
+                               INITIAL_REGISTRATION_TIMEOUT_MILLIS, 
MAX_REGISTRATION_TIMEOUT_MILLIS,
+                               ERROR_REGISTRATION_DELAY_MILLIS, 
REFUSED_REGISTRATION_DELAY_MILLIS);
+       }
+
+       public RetryingRegistration(
+                       Logger log,
+                       RpcService rpcService,
+                       String targetName, 
+                       Class<Gateway> targetType,
+                       String targetAddress,
+                       UUID leaderId,
+                       long initialRegistrationTimeout,
+                       long maxRegistrationTimeout,
+                       long delayOnError,
+                       long delayOnRefusedRegistration) {
+
+               checkArgument(initialRegistrationTimeout > 0, "initial 
registration timeout must be greater than zero");
+               checkArgument(maxRegistrationTimeout > 0, "maximum registration 
timeout must be greater than zero");
+               checkArgument(delayOnError >= 0, "delay on error must be 
non-negative");
+               checkArgument(delayOnRefusedRegistration >= 0, "delay on 
refused registration must be non-negative");
+
+               this.log = checkNotNull(log);
+               this.rpcService = checkNotNull(rpcService);
+               this.targetName = checkNotNull(targetName);
+               this.targetType = checkNotNull(targetType);
+               this.targetAddress = checkNotNull(targetAddress);
+               this.leaderId = checkNotNull(leaderId);
+               this.initialRegistrationTimeout = initialRegistrationTimeout;
+               this.maxRegistrationTimeout = maxRegistrationTimeout;
+               this.delayOnError = delayOnError;
+               this.delayOnRefusedRegistration = delayOnRefusedRegistration;
+
+               this.completionPromise = new DefaultPromise<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  completion and cancellation
+       // 
------------------------------------------------------------------------
+
+       public Future<Tuple2<Gateway, Success>> getFuture() {
+               return completionPromise.future();
+       }
+
+       /**
+        * Cancels the registration procedure.
+        */
+       public void cancel() {
+               canceled = true;
+       }
+
+       /**
+        * Checks if the registration was canceled.
+        * @return True if the registration was canceled, false otherwise.
+        */
+       public boolean isCanceled() {
+               return canceled;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  registration
+       // 
------------------------------------------------------------------------
+
+       protected abstract Future<RegistrationResponse> invokeRegistration(
+                       Gateway gateway, UUID leaderId, long timeoutMillis) 
throws Exception;
+
+       /**
+        * This method resolves the target address to a callable gateway and 
starts the
+        * registration after that.
+        */
+       @SuppressWarnings("unchecked")
+       public void startRegistration() {
+               try {
+                       // trigger resolution of the resource manager address 
to a callable gateway
+                       Future<Gateway> resourceManagerFuture = 
rpcService.connect(targetAddress, targetType);
+       
+                       // upon success, start the registration attempts
+                       resourceManagerFuture.onSuccess(new 
OnSuccess<Gateway>() {
+                               @Override
+                               public void onSuccess(Gateway result) {
+                                       log.info("Resolved {} address, 
beginning registration", targetName);
+                                       register(result, 1, 
initialRegistrationTimeout);
+                               }
+                       }, rpcService.getExecutionContext());
+       
+                       // upon failure, retry, unless this is cancelled
+                       resourceManagerFuture.onFailure(new OnFailure() {
+                               @Override
+                               public void onFailure(Throwable failure) {
+                                       if (!isCanceled()) {
+                                               log.warn("Could not resolve {} 
address {}, retrying...", targetName, targetAddress);
+                                               startRegistration();
+                                       }
+                               }
+                       }, rpcService.getExecutionContext());
+               }
+               catch (Throwable t) {
+                       cancel();
+                       completionPromise.tryFailure(t);
+               }
+       }
+
+       /**
+        * This method performs a registration attempt and triggers either a 
success notification or a retry,
+        * depending on the result.
+        */
+       @SuppressWarnings("unchecked")
+       private void register(final Gateway gateway, final int attempt, final 
long timeoutMillis) {
+               // eager check for canceling to avoid some unnecessary work
+               if (canceled) {
+                       return;
+               }
+
+               try {
+                       log.info("Registration at {} attempt {} 
(timeout={}ms)", targetName, attempt, timeoutMillis);
+                       Future<RegistrationResponse> registrationFuture = 
invokeRegistration(gateway, leaderId, timeoutMillis);
+       
+                       // if the registration was successful, let the 
TaskExecutor know
+                       registrationFuture.onSuccess(new 
OnSuccess<RegistrationResponse>() {
+                               
+                               @Override
+                               public void onSuccess(RegistrationResponse 
result) throws Throwable {
+                                       if (!isCanceled()) {
+                                               if (result instanceof 
RegistrationResponse.Success) {
+                                                       // registration 
successful!
+                                                       Success success = 
(Success) result;
+                                                       
completionPromise.success(new Tuple2<>(gateway, success));
+                                               }
+                                               else {
+                                                       // registration refused 
or unknown
+                                                       if (result instanceof 
RegistrationResponse.Decline) {
+                                                               
RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
+                                                               
log.info("Registration at {} was declined: {}", targetName, 
decline.getReason());
+                                                       } else {
+                                                               
log.error("Received unknown response to registration attempt: " + result);
+                                                       }
+
+                                                       log.info("Pausing and 
re-attempting registration in {} ms", delayOnRefusedRegistration);
+                                                       registerLater(gateway, 
1, initialRegistrationTimeout, delayOnRefusedRegistration);
+                                               }
+                                       }
+                               }
+                       }, rpcService.getExecutionContext());
+       
+                       // upon failure, retry
+                       registrationFuture.onFailure(new OnFailure() {
+                               @Override
+                               public void onFailure(Throwable failure) {
+                                       if (!isCanceled()) {
+                                               if (failure instanceof 
TimeoutException) {
+                                                       // we simply have not 
received a response in time. maybe the timeout was
+                                                       // very low (initial 
fast registration attempts), maybe the target endpoint is
+                                                       // currently down.
+                                                       if 
(log.isDebugEnabled()) {
+                                                               
log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
+                                                                               
targetName, targetAddress, attempt, timeoutMillis);
+                                                       }
+       
+                                                       long newTimeoutMillis = 
Math.min(2 * timeoutMillis, maxRegistrationTimeout);
+                                                       register(gateway, 
attempt + 1, newTimeoutMillis);
+                                               }
+                                               else {
+                                                       // a serious failure 
occurred. we still should not give up, but keep trying
+                                                       log.error("Registration 
at " + targetName + " failed due to an error", failure);
+                                                       log.info("Pausing and 
re-attempting registration in {} ms", delayOnError);
+       
+                                                       registerLater(gateway, 
1, initialRegistrationTimeout, delayOnError);
+                                               }
+                                       }
+                               }
+                       }, rpcService.getExecutionContext());
+               }
+               catch (Throwable t) {
+                       cancel();
+                       completionPromise.tryFailure(t);
+               }
+       }
+
+       private void registerLater(final Gateway gateway, final int attempt, 
final long timeoutMillis, long delay) {
+               rpcService.scheduleRunnable(new Runnable() {
+                       @Override
+                       public void run() {
+                               register(gateway, attempt, timeoutMillis);
+                       }
+               }, delay, TimeUnit.MILLISECONDS);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
new file mode 100644
index 0000000..309dcc1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import java.io.Serializable;
+
+public class JobMasterRegistration implements Serializable {
+       private static final long serialVersionUID = 8411214999193765202L;
+
+       private final String address;
+
+       public JobMasterRegistration(String address) {
+               this.address = address;
+       }
+
+       public String getAddress() {
+               return address;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
new file mode 100644
index 0000000..fb6c401
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.io.Serializable;
+
+public class RegistrationResponse implements Serializable {
+       private static final long serialVersionUID = -2379003255993119993L;
+
+       private final boolean isSuccess;
+       private final InstanceID instanceID;
+
+       public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
+               this.isSuccess = isSuccess;
+               this.instanceID = instanceID;
+       }
+
+       public boolean isSuccess() {
+               return isSuccess;
+       }
+
+       public InstanceID getInstanceID() {
+               return instanceID;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
new file mode 100644
index 0000000..44c022b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.dispatch.Mapper;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+
+import scala.concurrent.Future;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * ResourceManager implementation. The resource manager is responsible for 
resource de-/allocation
+ * and bookkeeping.
+ *
+ * It offers the following methods as part of its rpc interface to interact 
with the him remotely:
+ * <ul>
+ *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a 
{@link JobMaster} at the resource manager</li>
+ *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
+ * </ul>
+ */
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+       private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+       private final HighAvailabilityServices highAvailabilityServices;
+       private LeaderElectionService leaderElectionService = null;
+       private UUID leaderSessionID = null;
+
+       public ResourceManager(RpcService rpcService, HighAvailabilityServices 
highAvailabilityServices) {
+               super(rpcService);
+               this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
+               this.jobMasterGateways = new HashMap<>();
+       }
+
+       @Override
+       public void start() {
+               // start a leader
+               try {
+                       super.start();
+                       leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
+                       leaderElectionService.start(new 
ResourceManagerLeaderContender());
+               } catch (Throwable e) {
+                       log.error("A fatal error happened when starting the 
ResourceManager", e);
+                       throw new RuntimeException("A fatal error happened when 
starting the ResourceManager", e);
+               }
+       }
+
+       @Override
+       public void shutDown() {
+               try {
+                       leaderElectionService.stop();
+                       super.shutDown();
+               } catch(Throwable e) {
+                       log.error("A fatal error happened when shutdown the 
ResourceManager", e);
+                       throw new RuntimeException("A fatal error happened when 
shutdown the ResourceManager", e);
+               }
+       }
+
+       /**
+        * Gets the leader session id of current resourceManager.
+        *
+        * @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
+        */
+       @VisibleForTesting
+       UUID getLeaderSessionID() {
+               return leaderSessionID;
+       }
+
+       /**
+        * Register a {@link JobMaster} at the resource manager.
+        *
+        * @param jobMasterRegistration Job master registration information
+        * @return Future registration response
+        */
+       @RpcMethod
+       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+               Future<JobMasterGateway> jobMasterFuture = 
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
+
+               return jobMasterFuture.map(new Mapper<JobMasterGateway, 
RegistrationResponse>() {
+                       @Override
+                       public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
+                               InstanceID instanceID;
+
+                               if 
(jobMasterGateways.containsKey(jobMasterGateway)) {
+                                       instanceID = 
jobMasterGateways.get(jobMasterGateway);
+                               } else {
+                                       instanceID = new InstanceID();
+                                       jobMasterGateways.put(jobMasterGateway, 
instanceID);
+                               }
+
+                               return new RegistrationResponse(true, 
instanceID);
+                       }
+               }, getMainThreadExecutionContext());
+       }
+
+       /**
+        * Requests a slot from the resource manager.
+        *
+        * @param slotRequest Slot request
+        * @return Slot assignment
+        */
+       @RpcMethod
+       public SlotAssignment requestSlot(SlotRequest slotRequest) {
+               System.out.println("SlotRequest: " + slotRequest);
+               return new SlotAssignment();
+       }
+
+
+       /**
+        *
+        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
+        * @param resourceID               The resource ID of the TaskExecutor 
that registers
+        *
+        * @return The response by the ResourceManager.
+        */
+       @RpcMethod
+       public org.apache.flink.runtime.registration.RegistrationResponse 
registerTaskExecutor(
+                       UUID resourceManagerLeaderId,
+                       String taskExecutorAddress,
+                       ResourceID resourceID) {
+
+               return new TaskExecutorRegistrationSuccess(new InstanceID(), 
5000);
+       }
+
+       private class ResourceManagerLeaderContender implements LeaderContender 
{
+
+               /**
+                * Callback method when current resourceManager is granted 
leadership
+                *
+                * @param leaderSessionID unique leadershipID
+                */
+               @Override
+               public void grantLeadership(final UUID leaderSessionID) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("ResourceManager {} was 
granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+                                       ResourceManager.this.leaderSessionID = 
leaderSessionID;
+                                       // confirming the leader session ID 
might be blocking,
+                                       
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+                               }
+                       });
+               }
+
+               /**
+                * Callback method when current resourceManager lose leadership.
+                */
+               @Override
+               public void revokeLeadership() {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("ResourceManager {} was 
revoked leadership.", getAddress());
+                                       jobMasterGateways.clear();
+                                       leaderSessionID = null;
+                               }
+                       });
+               }
+
+               @Override
+               public String getAddress() {
+                       return ResourceManager.this.getAddress();
+               }
+
+               /**
+                * Handles error occurring in the leader election service
+                *
+                * @param exception Exception being thrown in the leader 
election service
+                */
+               @Override
+               public void handleError(final Exception exception) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.error("ResourceManager received an 
error from the LeaderElectionService.", exception);
+                                       // terminate ResourceManager in case of 
an error
+                                       shutDown();
+                               }
+                       });
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
new file mode 100644
index 0000000..b5782b0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+/**
+ * The {@link ResourceManager}'s RPC gateway interface.
+ */
+public interface ResourceManagerGateway extends RpcGateway {
+
+       /**
+        * Register a {@link JobMaster} at the resource manager.
+        *
+        * @param jobMasterRegistration Job master registration information
+        * @param timeout Timeout for the future to complete
+        * @return Future registration response
+        */
+       Future<RegistrationResponse> registerJobMaster(
+               JobMasterRegistration jobMasterRegistration,
+               @RpcTimeout FiniteDuration timeout);
+
+       /**
+        * Register a {@link JobMaster} at the resource manager.
+        *
+        * @param jobMasterRegistration Job master registration information
+        * @return Future registration response
+        */
+       Future<RegistrationResponse> registerJobMaster(JobMasterRegistration 
jobMasterRegistration);
+
+       /**
+        * Requests a slot from the resource manager.
+        *
+        * @param slotRequest Slot request
+        * @return Future slot assignment
+        */
+       Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+
+       /**
+        * 
+        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
+        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
+        * @param resourceID               The resource ID of the TaskExecutor 
that registers
+        * @param timeout                  The timeout for the response.
+        * 
+        * @return The future to the response by the ResourceManager.
+        */
+       Future<org.apache.flink.runtime.registration.RegistrationResponse> 
registerTaskExecutor(
+                       UUID resourceManagerLeaderId,
+                       String taskExecutorAddress,
+                       ResourceID resourceID,
+                       @RpcTimeout FiniteDuration timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
new file mode 100644
index 0000000..695204d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotAssignment implements Serializable{
+       private static final long serialVersionUID = -6990813455942742322L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
new file mode 100644
index 0000000..5c06648
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
+ * slots from registered TaskManagers and issues container allocation requests 
in case of there are not
+ * enough available slots. Besides, it should sync its slot allocation with 
TaskManager's heartbeat.
+ * <p>
+ * The main operation principle of SlotManager is:
+ * <ul>
+ * <li>1. All slot allocation status should be synced with TaskManager, which 
is the ground truth.</li>
+ * <li>2. All slots that have registered must be tracked, either by free pool 
or allocated pool.</li>
+ * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
+ * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
+ * be handled outside SlotManager. SlotManager will make each decision based 
on the information it currently
+ * holds.</li>
+ * </ul>
+ * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ */
+public abstract class SlotManager {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
+
+       /** Gateway to communicate with ResourceManager */
+       private final ResourceManagerGateway resourceManagerGateway;
+
+       /** All registered slots, including free and allocated slots */
+       private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
+
+       /** All pending slot requests, waiting available slots to fulfil */
+       private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+
+       /** All free slots that can be used to be allocated */
+       private final Map<SlotID, ResourceSlot> freeSlots;
+
+       /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
+       private final AllocationMap allocationMap;
+
+       public SlotManager(ResourceManagerGateway resourceManagerGateway) {
+               this.resourceManagerGateway = 
checkNotNull(resourceManagerGateway);
+               this.registeredSlots = new HashMap<>(16);
+               this.pendingSlotRequests = new LinkedHashMap<>(16);
+               this.freeSlots = new HashMap<>(16);
+               this.allocationMap = new AllocationMap();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  slot managements
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Request a slot with requirements, we may either fulfill the request 
or pending it. Trigger container
+        * allocation if we don't have enough resource. If we have free slot 
which can match the request, record
+        * this allocation and forward the request to TaskManager through 
ResourceManager (we want this done by
+        * RPC's main thread to avoid race condition).
+        *
+        * @param request The detailed request of the slot
+        */
+       public void requestSlot(final SlotRequest request) {
+               if (isRequestDuplicated(request)) {
+                       LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
+                       return;
+               }
+
+               // try to fulfil the request with current free slots
+               ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+               if (slot != null) {
+                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
+                               request.getAllocationId(), request.getJobId());
+
+                       // record this allocation in bookkeeping
+                       allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+
+                       // remove selected slot from free pool
+                       freeSlots.remove(slot.getSlotId());
+
+                       // TODO: send slot request to TaskManager
+               } else {
+                       LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
+                               "AllocationID:{}, JobID:{}", 
request.getAllocationId(), request.getJobId());
+                       allocateContainer(request.getResourceProfile());
+                       pendingSlotRequests.put(request.getAllocationId(), 
request);
+               }
+       }
+
+       /**
+        * Sync slot status with TaskManager's SlotReport.
+        */
+       public void updateSlotStatus(final SlotReport slotReport) {
+               for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+                       updateSlotStatus(slotStatus);
+               }
+       }
+
+       /**
+        * The slot request to TaskManager may be either failed by rpc 
communication (timeout, network error, etc.)
+        * or really rejected by TaskManager. We shall retry this request by:
+        * <ul>
+        * <li>1. verify and clear all the previous allocate information for 
this request
+        * <li>2. try to request slot again
+        * </ul>
+        * <p>
+        * This may cause some duplicate allocation, e.g. the slot request to 
TaskManager is successful but the response
+        * is lost somehow, so we may request a slot in another TaskManager, 
this causes two slots assigned to one request,
+        * but it can be taken care of by rejecting registration at JobManager.
+        *
+        * @param originalRequest The original slot request
+        * @param slotId          The target SlotID
+        */
+       public void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
+               final AllocationID originalAllocationId = 
originalRequest.getAllocationId();
+               LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
+                       slotId, originalAllocationId, 
originalRequest.getJobId());
+
+               // verify the allocation info before we do anything
+               if (freeSlots.containsKey(slotId)) {
+                       // this slot is currently empty, no need to de-allocate 
it from our allocations
+                       LOG.info("Original slot is somehow empty, retrying this 
request");
+
+                       // before retry, we should double check whether this 
request was allocated by some other ways
+                       if (!allocationMap.isAllocated(originalAllocationId)) {
+                               requestSlot(originalRequest);
+                       } else {
+                               LOG.info("The failed request has somehow been 
allocated, SlotID:{}",
+                                       
allocationMap.getSlotID(originalAllocationId));
+                       }
+               } else if (allocationMap.isAllocated(slotId)) {
+                       final AllocationID currentAllocationId = 
allocationMap.getAllocationID(slotId);
+
+                       // check whether we have an agreement on whom this slot 
belongs to
+                       if (originalAllocationId.equals(currentAllocationId)) {
+                               LOG.info("De-allocate this request and retry");
+                               
allocationMap.removeAllocation(currentAllocationId);
+
+                               // put this slot back to free pool
+                               ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
+                               freeSlots.put(slotId, slot);
+
+                               // retry the request
+                               requestSlot(originalRequest);
+                       } else {
+                               // the slot is taken by someone else, no need 
to de-allocate it from our allocations
+                               LOG.info("Original slot is taken by someone 
else, current AllocationID:{}", currentAllocationId);
+
+                               // before retry, we should double check whether 
this request was allocated by some other ways
+                               if 
(!allocationMap.isAllocated(originalAllocationId)) {
+                                       requestSlot(originalRequest);
+                               } else {
+                                       LOG.info("The failed request is somehow 
been allocated, SlotID:{}",
+                                               
allocationMap.getSlotID(originalAllocationId));
+                               }
+                       }
+               } else {
+                       LOG.error("BUG! {} is neither in free pool nor in 
allocated pool", slotId);
+               }
+       }
+
+       /**
+        * Callback for TaskManager failures. In case that a TaskManager fails, 
we have to clean up all its slots.
+        *
+        * @param resourceId The ResourceID of the TaskManager
+        */
+       public void notifyTaskManagerFailure(final ResourceID resourceId) {
+               LOG.info("Resource:{} been notified failure", resourceId);
+               final Map<SlotID, ResourceSlot> slotIdsToRemove = 
registeredSlots.remove(resourceId);
+               if (slotIdsToRemove != null) {
+                       for (SlotID slotId : slotIdsToRemove.keySet()) {
+                               LOG.info("Removing Slot:{} upon resource 
failure", slotId);
+                               if (freeSlots.containsKey(slotId)) {
+                                       freeSlots.remove(slotId);
+                               } else if (allocationMap.isAllocated(slotId)) {
+                                       allocationMap.removeAllocation(slotId);
+                               } else {
+                                       LOG.error("BUG! {} is neither in free 
pool nor in allocated pool", slotId);
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  internal behaviors
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Update slot status based on TaskManager's report. There are mainly 
two situations when we receive the report:
+        * <ul>
+        * <li>1. The slot is newly registered.</li>
+        * <li>2. The slot has registered, it contains its current status.</li>
+        * </ul>
+        * <p>
+        * Regarding 1: It's fairly simple, we just record this slot's status, 
and trigger schedule if slot is empty.
+        * <p>
+        * Regarding 2: It will cause some weird situation since we may have 
some time-gap on how the slot's status really
+        * is. We may have some updates on the slot's allocation, but it 
doesn't reflected by TaskManager's heartbeat yet,
+        * and we may make some wrong decision if we cannot guarantee we have 
the exact status about all the slots. So
+        * the principle here is: We always trust TaskManager's heartbeat, we 
will correct our information based on that
+        * and take next action based on the diff between our information and 
heartbeat status.
+        *
+        * @param reportedStatus Reported slot status
+        */
+       void updateSlotStatus(final SlotStatus reportedStatus) {
+               final SlotID slotId = reportedStatus.getSlotID();
+               final ResourceSlot slot = new ResourceSlot(slotId, 
reportedStatus.getProfiler());
+
+               if (registerNewSlot(slot)) {
+                       // we have a newly registered slot
+                       LOG.info("New slot appeared, SlotID:{}, 
AllocationID:{}", slotId, reportedStatus.getAllocationID());
+
+                       if (reportedStatus.getAllocationID() != null) {
+                               // slot in use, record this in bookkeeping
+                               allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
+                       } else {
+                               handleFreeSlot(new ResourceSlot(slotId, 
reportedStatus.getProfiler()));
+                       }
+               } else {
+                       // slot exists, update current information
+                       if (reportedStatus.getAllocationID() != null) {
+                               // slot is reported in use
+                               final AllocationID reportedAllocationId = 
reportedStatus.getAllocationID();
+
+                               // check whether we also thought this slot is 
in use
+                               if (allocationMap.isAllocated(slotId)) {
+                                       // we also think that slot is in use, 
check whether the AllocationID matches
+                                       final AllocationID currentAllocationId 
= allocationMap.getAllocationID(slotId);
+
+                                       if 
(!reportedAllocationId.equals(currentAllocationId)) {
+                                               LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:{}",
+                                                       slotId, 
currentAllocationId, reportedAllocationId);
+
+                                               // seems we have a disagreement 
about the slot assignments, need to correct it
+                                               
allocationMap.removeAllocation(slotId);
+                                               
allocationMap.addAllocation(slotId, reportedAllocationId);
+                                       }
+                               } else {
+                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:null, reported:{}",
+                                               slotId, reportedAllocationId);
+
+                                       // we thought the slot is free, should 
correct this information
+                                       allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
+
+                                       // remove this slot from free slots pool
+                                       freeSlots.remove(slotId);
+                               }
+                       } else {
+                               // slot is reported empty
+
+                               // check whether we also thought this slot is 
empty
+                               if (allocationMap.isAllocated(slotId)) {
+                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
+                                               slotId, 
allocationMap.getAllocationID(slotId));
+
+                                       // we thought the slot is in use, 
correct it
+                                       allocationMap.removeAllocation(slotId);
+
+                                       // we have a free slot!
+                                       handleFreeSlot(new ResourceSlot(slotId, 
reportedStatus.getProfiler()));
+                               }
+                       }
+               }
+       }
+
+       /**
+        * When we have a free slot, try to fulfill the pending request first. 
If any request can be fulfilled,
+        * record this allocation in bookkeeping and send slot request to 
TaskManager, else we just add this slot
+        * to the free pool.
+        *
+        * @param freeSlot The free slot
+        */
+       private void handleFreeSlot(final ResourceSlot freeSlot) {
+               SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, 
pendingSlotRequests);
+
+               if (chosenRequest != null) {
+                       
pendingSlotRequests.remove(chosenRequest.getAllocationId());
+
+                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", freeSlot.getSlotId(),
+                               chosenRequest.getAllocationId(), 
chosenRequest.getJobId());
+                       allocationMap.addAllocation(freeSlot.getSlotId(), 
chosenRequest.getAllocationId());
+
+                       // TODO: send slot request to TaskManager
+               } else {
+                       freeSlots.put(freeSlot.getSlotId(), freeSlot);
+               }
+       }
+
+       /**
+        * Check whether the request is duplicated. We use AllocationID to 
identify slot request, for each
+        * formerly received slot request, it is either in pending list or 
already been allocated.
+        *
+        * @param request The slot request
+        * @return <tt>true</tt> if the request is duplicated
+        */
+       private boolean isRequestDuplicated(final SlotRequest request) {
+               final AllocationID allocationId = request.getAllocationId();
+               return pendingSlotRequests.containsKey(allocationId)
+                       || allocationMap.isAllocated(allocationId);
+       }
+
+       /**
+        * Try to register slot, and tell if this slot is newly registered.
+        *
+        * @param slot The ResourceSlot which will be checked and registered
+        * @return <tt>true</tt> if we meet a new slot
+        */
+       private boolean registerNewSlot(final ResourceSlot slot) {
+               final SlotID slotId = slot.getSlotId();
+               final ResourceID resourceId = slotId.getResourceID();
+               if (!registeredSlots.containsKey(resourceId)) {
+                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
+               }
+               return registeredSlots.get(resourceId).put(slotId, slot) == 
null;
+       }
+
+       private ResourceSlot getRegisteredSlot(final SlotID slotId) {
+               final ResourceID resourceId = slotId.getResourceID();
+               if (!registeredSlots.containsKey(resourceId)) {
+                       return null;
+               }
+               return registeredSlots.get(resourceId).get(slotId);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Framework specific behavior
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Choose a slot to use among all free slots, the behavior is framework 
specified.
+        *
+        * @param request   The slot request
+        * @param freeSlots All slots which can be used
+        * @return The slot we choose to use, <tt>null</tt> if we did not find 
a match
+        */
+       protected abstract ResourceSlot chooseSlotToUse(final SlotRequest 
request,
+               final Map<SlotID, ResourceSlot> freeSlots);
+
+       /**
+        * Choose a pending request to fulfill when we have a free slot, the 
behavior is framework specified.
+        *
+        * @param offeredSlot     The free slot
+        * @param pendingRequests All the pending slot requests
+        * @return The chosen SlotRequest, <tt>null</tt> if we did not find a 
match
+        */
+       protected abstract SlotRequest chooseRequestToFulfill(final 
ResourceSlot offeredSlot,
+               final Map<AllocationID, SlotRequest> pendingRequests);
+
+       /**
+        * The framework specific code for allocating a container for specified 
resource profile.
+        *
+        * @param resourceProfile The resource profile
+        */
+       protected abstract void allocateContainer(final ResourceProfile 
resourceProfile);
+
+
+       // 
------------------------------------------------------------------------
+       //  Helper classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * We maintain all the allocations with SlotID and AllocationID. We are 
able to get or remove the allocation info
+        * either by SlotID or AllocationID.
+        */
+       private static class AllocationMap {
+
+               /** All allocated slots (by SlotID) */
+               private final Map<SlotID, AllocationID> allocatedSlots;
+
+               /** All allocated slots (by AllocationID), it'a a inverse view 
of allocatedSlots */
+               private final Map<AllocationID, SlotID> 
allocatedSlotsByAllocationId;
+
+               AllocationMap() {
+                       this.allocatedSlots = new HashMap<>(16);
+                       this.allocatedSlotsByAllocationId = new HashMap<>(16);
+               }
+
+               /**
+                * Add a allocation
+                *
+                * @param slotId       The slot id
+                * @param allocationId The allocation id
+                */
+               void addAllocation(final SlotID slotId, final AllocationID 
allocationId) {
+                       allocatedSlots.put(slotId, allocationId);
+                       allocatedSlotsByAllocationId.put(allocationId, slotId);
+               }
+
+               /**
+                * De-allocation with slot id
+                *
+                * @param slotId The slot id
+                */
+               void removeAllocation(final SlotID slotId) {
+                       if (allocatedSlots.containsKey(slotId)) {
+                               final AllocationID allocationId = 
allocatedSlots.get(slotId);
+                               allocatedSlots.remove(slotId);
+                               
allocatedSlotsByAllocationId.remove(allocationId);
+                       }
+               }
+
+               /**
+                * De-allocation with allocation id
+                *
+                * @param allocationId The allocation id
+                */
+               void removeAllocation(final AllocationID allocationId) {
+                       if 
(allocatedSlotsByAllocationId.containsKey(allocationId)) {
+                               SlotID slotId = 
allocatedSlotsByAllocationId.get(allocationId);
+                               
allocatedSlotsByAllocationId.remove(allocationId);
+                               allocatedSlots.remove(slotId);
+                       }
+               }
+
+               /**
+                * Check whether allocation exists by slot id
+                *
+                * @param slotId The slot id
+                * @return true if the allocation exists
+                */
+               boolean isAllocated(final SlotID slotId) {
+                       return allocatedSlots.containsKey(slotId);
+               }
+
+               /**
+                * Check whether allocation exists by allocation id
+                *
+                * @param allocationId The allocation id
+                * @return true if the allocation exists
+                */
+               boolean isAllocated(final AllocationID allocationId) {
+                       return 
allocatedSlotsByAllocationId.containsKey(allocationId);
+               }
+
+               AllocationID getAllocationID(final SlotID slotId) {
+                       return allocatedSlots.get(slotId);
+               }
+
+               SlotID getSlotID(final AllocationID allocationId) {
+                       return allocatedSlotsByAllocationId.get(allocationId);
+               }
+
+               public int size() {
+                       return allocatedSlots.size();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Testing utilities
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       boolean isAllocated(final SlotID slotId) {
+               return allocationMap.isAllocated(slotId);
+       }
+
+       @VisibleForTesting
+       boolean isAllocated(final AllocationID allocationId) {
+               return allocationMap.isAllocated(allocationId);
+       }
+
+       /**
+        * Add free slots directly to the free pool, this will not trigger 
pending requests allocation
+        *
+        * @param slot The resource slot
+        */
+       @VisibleForTesting
+       void addFreeSlot(final ResourceSlot slot) {
+               final ResourceID resourceId = slot.getResourceID();
+               final SlotID slotId = slot.getSlotId();
+
+               if (!registeredSlots.containsKey(resourceId)) {
+                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
+               }
+               registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
+               freeSlots.put(slotId, slot);
+       }
+
+       @VisibleForTesting
+       int getAllocatedSlotCount() {
+               return allocationMap.size();
+       }
+
+       @VisibleForTesting
+       int getFreeSlotCount() {
+               return freeSlots.size();
+       }
+
+       @VisibleForTesting
+       int getPendingRequestCount() {
+               return pendingSlotRequests.size();
+       }
+}

Reply via email to