http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
new file mode 100644
index 0000000..52d9d06
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+       private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+       private static final long DEFAULT_TESTING_MEMORY = 512;
+
+       private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+               new ResourceProfile(DEFAULT_TESTING_CPU_CORES, 
DEFAULT_TESTING_MEMORY);
+
+       private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+               new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, 
DEFAULT_TESTING_MEMORY * 2);
+
+       private ResourceManagerGateway resourceManagerGateway;
+
+       @Before
+       public void setUp() {
+               resourceManagerGateway = mock(ResourceManagerGateway.class);
+       }
+
+       /**
+        * Tests that there are no free slots when we request, need to allocate 
from cluster manager master
+        */
+       @Test
+       public void testRequestSlotWithoutFreeSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that there are some free slots when we request, and the 
request is fulfilled immediately
+        */
+       @Test
+       public void testRequestSlotWithFreeSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
+               assertEquals(1, slotManager.getFreeSlotCount());
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertEquals(0, slotManager.getAllocatedContainers().size());
+       }
+
+       /**
+        * Tests that there are some free slots when we request, but none of 
them are suitable
+        */
+       @Test
+       public void testRequestSlotWithoutSuitableSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
2);
+               assertEquals(2, slotManager.getFreeSlotCount());
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(2, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that we send duplicated slot request
+        */
+       @Test
+       public void testDuplicatedSlotRequest() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
+
+               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+               slotManager.requestSlot(request1);
+               slotManager.requestSlot(request2);
+               slotManager.requestSlot(request2);
+               slotManager.requestSlot(request1);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that we send multiple slot requests
+        */
+       @Test
+       public void testRequestMultipleSlots() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
5);
+
+               // request 3 normal slots
+               for (int i = 0; i < 3; ++i) {
+                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_PROFILE));
+               }
+
+               // request 2 big slots
+               for (int i = 0; i < 2; ++i) {
+                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               }
+
+               // request 1 normal slot again
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(4, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(2, slotManager.getPendingRequestCount());
+               assertEquals(2, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(1));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, and we used it to 
fulfill a pending request
+        */
+       @Test
+       public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               assertEquals(1, slotManager.getPendingRequestCount());
+
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, but we have no pending 
request
+        */
+       @Test
+       public void testNewlyAppearedFreeSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, but it't not suitable 
for all the pending requests
+        */
+       @Test
+       public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               assertEquals(1, slotManager.getPendingRequestCount());
+
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, and it's been reported 
using by some job
+        */
+       @Test
+       public void testNewlyAppearedInUseSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that we had a slot in-use, and it's confirmed by SlotReport
+        */
+       @Test
+       public void testExistingInUseSlotUpdateStatus() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+
+               // slot status is confirmed
+               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE,
+                       request.getAllocationId(), request.getJobId());
+               slotManager.updateSlotStatus(slotStatus2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that we had a slot in-use, but it's empty according to the 
SlotReport
+        */
+       @Test
+       public void testExistingInUseSlotAdjustedToEmpty() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request1);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               // another request pending
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+               // but slot is reported empty again, request2 will be 
fulfilled, request1 will be missing
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+       }
+
+       /**
+        * Tests that we had a slot in use, and it's also reported in use by 
TaskManager, but the allocation
+        * information didn't match.
+        */
+       @Test
+       public void testExistingInUseSlotWithDifferentAllocationInfo() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+               // update slot status with different allocation info
+               slotManager.updateSlotStatus(slotStatus2);
+
+               // original request is missing and won't be allocated
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               
assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+       }
+
+       /**
+        * Tests that we had a free slot, and it's confirmed by SlotReport
+        */
+       @Test
+       public void testExistingEmptySlotUpdateStatus() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.addFreeSlot(slot);
+
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+       /**
+        * Tests that we had a free slot, and it's reported in-use by 
TaskManager
+        */
+       @Test
+       public void testExistingEmptySlotAdjustedToInUse() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.addFreeSlot(slot);
+
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE,
+                       new AllocationID(), new JobID());
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slot.getSlotId()));
+       }
+
+       /**
+        * Tests that we did some allocation but failed / rejected by 
TaskManager, request will retry
+        */
+       @Test
+       public void testSlotAllocationFailedAtTaskManager() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.addFreeSlot(slot);
+
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+
+       /**
+        * Tests that we did some allocation but failed / rejected by 
TaskManager, and slot is occupied by another request
+        */
+       @Test
+       public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.addFreeSlot(slot);
+
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // slot is set empty by heartbeat
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
slot.getResourceProfile());
+               slotManager.updateSlotStatus(slotStatus);
+
+               // another request took this slot
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+               // original request should be pended
+               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+       }
+
+       @Test
+       public void testNotifyTaskManagerFailure() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+
+               ResourceID resource1 = ResourceID.generate();
+               ResourceID resource2 = ResourceID.generate();
+
+               ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 
1), DEFAULT_TESTING_PROFILE);
+               ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 
2), DEFAULT_TESTING_PROFILE);
+               ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 
1), DEFAULT_TESTING_PROFILE);
+               ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 
2), DEFAULT_TESTING_PROFILE);
+
+               slotManager.addFreeSlot(slot11);
+               slotManager.addFreeSlot(slot21);
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(2, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               slotManager.addFreeSlot(slot12);
+               slotManager.addFreeSlot(slot22);
+
+               assertEquals(2, slotManager.getAllocatedSlotCount());
+               assertEquals(2, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               slotManager.notifyTaskManagerFailure(resource2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               // notify an not exist resource failure
+               slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  testing utilities
+       // 
------------------------------------------------------------------------
+
+       private void directlyProvideFreeSlots(
+               final SlotManager slotManager,
+               final ResourceProfile resourceProfile,
+               final int freeSlotNum)
+       {
+               for (int i = 0; i < freeSlotNum; ++i) {
+                       slotManager.addFreeSlot(new 
ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  testing classes
+       // 
------------------------------------------------------------------------
+
+       private static class TestingSlotManager extends SlotManager {
+
+               private final List<ResourceProfile> allocatedContainers;
+
+               TestingSlotManager(ResourceManagerGateway 
resourceManagerGateway) {
+                       super(resourceManagerGateway);
+                       this.allocatedContainers = new LinkedList<>();
+               }
+
+               /**
+                * Choose slot randomly if it matches requirement
+                *
+                * @param request   The slot request
+                * @param freeSlots All slots which can be used
+                * @return The chosen slot or null if cannot find a match
+                */
+               @Override
+               protected ResourceSlot chooseSlotToUse(SlotRequest request, 
Map<SlotID, ResourceSlot> freeSlots) {
+                       for (ResourceSlot slot : freeSlots.values()) {
+                               if 
(slot.isMatchingRequirement(request.getResourceProfile())) {
+                                       return slot;
+                               }
+                       }
+                       return null;
+               }
+
+               /**
+                * Choose request randomly if offered slot can match its 
requirement
+                *
+                * @param offeredSlot     The free slot
+                * @param pendingRequests All the pending slot requests
+                * @return The chosen request's AllocationID or null if cannot 
find a match
+                */
+               @Override
+               protected SlotRequest chooseRequestToFulfill(ResourceSlot 
offeredSlot,
+                       Map<AllocationID, SlotRequest> pendingRequests)
+               {
+                       for (Map.Entry<AllocationID, SlotRequest> 
pendingRequest : pendingRequests.entrySet()) {
+                               if 
(offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile()))
 {
+                                       return pendingRequest.getValue();
+                               }
+                       }
+                       return null;
+               }
+
+               @Override
+               protected void allocateContainer(ResourceProfile 
resourceProfile) {
+                       allocatedContainers.add(resourceProfile);
+               }
+
+               List<ResourceProfile> getAllocatedContainers() {
+                       return allocatedContainers;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 2790cf8..f55069e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -21,28 +21,14 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
-import org.mockito.Mockito;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {

http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
deleted file mode 100644
index 9508825..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/RetryingRegistrationTest.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.Futures;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the generic retrying registration class, validating the failure, 
retry, and back-off behavior.
- */
-public class RetryingRegistrationTest extends TestLogger {
-
-       @Test
-       public void testSimpleSuccessfulRegistration() throws Exception {
-               final String testId = "laissez les bon temps roulez";
-               final String testEndpointAddress = "<test-address>";
-               final UUID leaderId = UUID.randomUUID();
-
-               // an endpoint that immediately returns success
-               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new TestRegistrationSuccess(testId));
-               TestingRpcService rpc = new TestingRpcService();
-
-               try {
-                       rpc.registerGateway(testEndpointAddress, testGateway);
-
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-                       registration.startRegistration();
-
-                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
-                       assertNotNull(future);
-
-                       // multiple accesses return the same future
-                       assertEquals(future, registration.getFuture());
-
-                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success = 
-                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
-
-                       // validate correct invocation and result
-                       assertEquals(testId, success.f1.getCorrelationId());
-                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
-               }
-               finally {
-                       testGateway.stop();
-                       rpc.stopService();
-               }
-       }
-       
-       @Test
-       public void testPropagateFailures() throws Exception {
-               final String testExceptionMessage = "testExceptionMessage";
-
-               // RPC service that fails with exception upon the connection
-               RpcService rpc = mock(RpcService.class);
-               when(rpc.connect(anyString(), any(Class.class))).thenThrow(new 
RuntimeException(testExceptionMessage));
-
-               TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
-               registration.startRegistration();
-
-               Future<?> future = registration.getFuture();
-               assertTrue(future.failed().isCompleted());
-
-               assertEquals(testExceptionMessage, 
future.failed().value().get().get().getMessage());
-       }
-
-       @Test
-       public void testRetryConnectOnFailure() throws Exception {
-               final String testId = "laissez les bon temps roulez";
-               final UUID leaderId = UUID.randomUUID();
-
-               ExecutorService executor = Executors.newCachedThreadPool();
-               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new TestRegistrationSuccess(testId));
-
-               try {
-                       // RPC service that fails upon the first connection, 
but succeeds on the second
-                       RpcService rpc = mock(RpcService.class);
-                       when(rpc.connect(anyString(), 
any(Class.class))).thenReturn(
-                                       Futures.failed(new Exception("test 
connect failure")),  // first connection attempt fails
-                                       Futures.successful(testGateway)         
                // second connection attempt succeeds
-                       );
-                       
when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
-
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "foobar address", leaderId);
-                       registration.startRegistration();
-
-                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       Await.result(registration.getFuture(), 
new FiniteDuration(10, SECONDS));
-
-                       // validate correct invocation and result
-                       assertEquals(testId, success.f1.getCorrelationId());
-                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
-               }
-               finally {
-                       testGateway.stop();
-                       executor.shutdown();
-               }
-       }
-
-       @Test
-       public void testRetriesOnTimeouts() throws Exception {
-               final String testId = "rien ne va plus";
-               final String testEndpointAddress = "<test-address>";
-               final UUID leaderId = UUID.randomUUID();
-
-               // an endpoint that immediately returns futures with timeouts 
before returning a successful future
-               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(
-                               null, // timeout
-                               null, // timeout
-                               new TestRegistrationSuccess(testId) // success
-               );
-
-               TestingRpcService rpc = new TestingRpcService();
-
-               try {
-                       rpc.registerGateway(testEndpointAddress, testGateway);
-       
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-       
-                       long started = System.nanoTime();
-                       registration.startRegistration();
-       
-                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
-                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
-       
-                       long finished = System.nanoTime();
-                       long elapsedMillis = (finished - started) / 1000000;
-       
-                       // validate correct invocation and result
-                       assertEquals(testId, success.f1.getCorrelationId());
-                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
-       
-                       // validate that some retry-delay / back-off behavior 
happened
-                       assertTrue("retries did not properly back off", 
elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
-               }
-               finally {
-                       rpc.stopService();
-                       testGateway.stop();
-               }
-       }
-
-       @Test
-       public void testDecline() throws Exception {
-               final String testId = "qui a coupe le fromage";
-               final String testEndpointAddress = "<test-address>";
-               final UUID leaderId = UUID.randomUUID();
-
-               TestingRpcService rpc = new TestingRpcService();
-
-               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(
-                               null, // timeout
-                               new RegistrationResponse.Decline("no reason "),
-                               null, // timeout
-                               new TestRegistrationSuccess(testId) // success
-               );
-
-               try {
-                       rpc.registerGateway(testEndpointAddress, testGateway);
-
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-
-                       long started = System.nanoTime();
-                       registration.startRegistration();
-       
-                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
-                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
-
-                       long finished = System.nanoTime();
-                       long elapsedMillis = (finished - started) / 1000000;
-
-                       // validate correct invocation and result
-                       assertEquals(testId, success.f1.getCorrelationId());
-                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
-
-                       // validate that some retry-delay / back-off behavior 
happened
-                       assertTrue("retries did not properly back off", 
elapsedMillis >= 
-                                       2 * 
TestRetryingRegistration.INITIAL_TIMEOUT + 
TestRetryingRegistration.DELAY_ON_DECLINE);
-               }
-               finally {
-                       testGateway.stop();
-                       rpc.stopService();
-               }
-       }
-       
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testRetryOnError() throws Exception {
-               final String testId = "Petit a petit, l'oiseau fait son nid";
-               final String testEndpointAddress = "<test-address>";
-               final UUID leaderId = UUID.randomUUID();
-
-               TestingRpcService rpc = new TestingRpcService();
-
-               try {
-                       // gateway that upon calls first responds with a 
failure, then with a success
-                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
-
-                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(
-                                       
Futures.<RegistrationResponse>failed(new Exception("test exception")),
-                                       
Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
-                       
-                       rpc.registerGateway(testEndpointAddress, testGateway);
-
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-
-                       long started = System.nanoTime();
-                       registration.startRegistration();
-
-                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
-                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
-
-                       long finished = System.nanoTime();
-                       long elapsedMillis = (finished - started) / 1000000;
-                       
-                       assertEquals(testId, success.f1.getCorrelationId());
-
-                       // validate that some retry-delay / back-off behavior 
happened
-                       assertTrue("retries did not properly back off",
-                                       elapsedMillis >= 
TestRetryingRegistration.DELAY_ON_ERROR);
-               }
-               finally {
-                       rpc.stopService();
-               }
-       }
-
-       @Test
-       public void testCancellation() throws Exception {
-               final String testEndpointAddress = "my-test-address";
-               final UUID leaderId = UUID.randomUUID();
-
-               TestingRpcService rpc = new TestingRpcService();
-
-               try {
-                       Promise<RegistrationResponse> result = 
Futures.promise();
-
-                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
-                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(result.future());
-
-                       rpc.registerGateway(testEndpointAddress, testGateway);
-
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-                       registration.startRegistration();
-
-                       // cancel and fail the current registration attempt
-                       registration.cancel();
-                       result.failure(new TimeoutException());
-
-                       // there should not be a second registration attempt
-                       verify(testGateway, 
atMost(1)).registrationCall(any(UUID.class), anyLong());
-               }
-               finally {
-                       rpc.stopService();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test registration
-       // 
------------------------------------------------------------------------
-
-       private static class TestRegistrationSuccess extends 
RegistrationResponse.Success {
-               private static final long serialVersionUID = 
5542698790917150604L;
-
-               private final String correlationId;
-
-               private TestRegistrationSuccess(String correlationId) {
-                       this.correlationId = correlationId;
-               }
-
-               public String getCorrelationId() {
-                       return correlationId;
-               }
-       }
-
-       private static class TestRetryingRegistration extends 
RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
-
-               // we use shorter timeouts here to speed up the tests
-               static final long INITIAL_TIMEOUT = 20;
-               static final long MAX_TIMEOUT = 200;
-               static final long DELAY_ON_ERROR = 200;
-               static final long DELAY_ON_DECLINE = 200;
-
-               public TestRetryingRegistration(RpcService rpc, String 
targetAddress, UUID leaderId) {
-                       
super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
-                                       rpc, "TestEndpoint",
-                                       TestRegistrationGateway.class,
-                                       targetAddress, leaderId,
-                                       INITIAL_TIMEOUT, MAX_TIMEOUT, 
DELAY_ON_ERROR, DELAY_ON_DECLINE);
-               }
-
-               @Override
-               protected Future<RegistrationResponse> invokeRegistration(
-                               TestRegistrationGateway gateway, UUID leaderId, 
long timeoutMillis) {
-                       return gateway.registrationCall(leaderId, 
timeoutMillis);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
deleted file mode 100644
index a049e48..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/registration/TestRegistrationGateway.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.Futures;
-
-import org.apache.flink.runtime.rpc.TestingGatewayBase;
-import org.apache.flink.util.Preconditions;
-
-import scala.concurrent.Future;
-
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class TestRegistrationGateway extends TestingGatewayBase {
-
-       private final BlockingQueue<RegistrationCall> invocations;
-
-       private final RegistrationResponse[] responses;
-
-       private int pos;
-
-       public TestRegistrationGateway(RegistrationResponse... responses) {
-               Preconditions.checkArgument(responses != null && 
responses.length > 0);
-
-               this.invocations = new LinkedBlockingQueue<>();
-               this.responses = responses;
-               
-       }
-
-       // 
------------------------------------------------------------------------
-
-       public Future<RegistrationResponse> registrationCall(UUID leaderId, 
long timeout) {
-               invocations.add(new RegistrationCall(leaderId, timeout));
-
-               RegistrationResponse response = responses[pos];
-               if (pos < responses.length - 1) {
-                       pos++;
-               }
-
-               // return a completed future (for a proper value), or one that 
never completes and will time out (for null)
-               return response != null ? Futures.successful(response) : 
this.<RegistrationResponse>futureWithTimeout(timeout);
-       }
-
-       public BlockingQueue<RegistrationCall> getInvocations() {
-               return invocations;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       public static class RegistrationCall {
-               private final UUID leaderId;
-               private final long timeout;
-
-               public RegistrationCall(UUID leaderId, long timeout) {
-                       this.leaderId = leaderId;
-                       this.timeout = timeout;
-               }
-
-               public UUID leaderId() {
-                       return leaderId;
-               }
-
-               public long timeout() {
-                       return timeout;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
deleted file mode 100644
index dfffeda..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.StartStoppable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.UUID;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * resourceManager HA test, including grant leadership and revoke leadership
- */
-public class ResourceManagerHATest {
-
-       @Test
-       public void testGrantAndRevokeLeadership() throws Exception {
-               // mock a RpcService which will return a special RpcGateway 
when call its startServer method, the returned RpcGateway directly execute 
runAsync call
-               TestingResourceManagerGatewayProxy gateway = 
mock(TestingResourceManagerGatewayProxy.class);
-               doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
-
-               RpcService rpcService = mock(RpcService.class);
-               
when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
-
-               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
-               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
-
-               final ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
-               resourceManager.start();
-               // before grant leadership, resourceManager's leaderId is null
-               Assert.assertNull(resourceManager.getLeaderSessionID());
-               final UUID leaderId = UUID.randomUUID();
-               leaderElectionService.isLeader(leaderId);
-               // after grant leadership, resourceManager's leaderId has value
-               Assert.assertEquals(leaderId, 
resourceManager.getLeaderSessionID());
-               // then revoke leadership, resourceManager's leaderId is null 
again
-               leaderElectionService.notLeader();
-               Assert.assertNull(resourceManager.getLeaderSessionID());
-       }
-
-       private static abstract class TestingResourceManagerGatewayProxy 
implements MainThreadExecutor, StartStoppable, RpcGateway {
-               @Override
-               public void runAsync(Runnable runnable) {
-                       runnable.run();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
deleted file mode 100644
index 25a670c..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.NonHaServices;
-import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class TaskExecutorTest extends TestLogger {
-
-       @Test
-       public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
-               final ResourceID resourceID = ResourceID.generate();
-               final String resourceManagerAddress = 
"/resource/manager/address/one";
-
-               final TestingRpcService rpc = new TestingRpcService();
-               try {
-                       // register a mock resource manager gateway
-                       ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
-                       rpc.registerGateway(resourceManagerAddress, rmGateway);
-
-                       NonHaServices haServices = new 
NonHaServices(resourceManagerAddress);
-                       TaskExecutor taskManager = 
TaskExecutor.startTaskManagerComponentsAndActor(
-                               new Configuration(), resourceID, rpc, 
"localhost", haServices, true);
-                       String taskManagerAddress = taskManager.getAddress();
-                       taskManager.start();
-
-                       verify(rmGateway, timeout(5000)).registerTaskExecutor(
-                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
-               }
-               finally {
-                       rpc.stopService();
-               }
-       }
-
-       @Test
-       public void testTriggerRegistrationOnLeaderChange() throws Exception {
-               final ResourceID resourceID = ResourceID.generate();
-
-               final String address1 = "/resource/manager/address/one";
-               final String address2 = "/resource/manager/address/two";
-               final UUID leaderId1 = UUID.randomUUID();
-               final UUID leaderId2 = UUID.randomUUID();
-
-               final TestingRpcService rpc = new TestingRpcService();
-               try {
-                       // register the mock resource manager gateways
-                       ResourceManagerGateway rmGateway1 = 
mock(ResourceManagerGateway.class);
-                       ResourceManagerGateway rmGateway2 = 
mock(ResourceManagerGateway.class);
-                       rpc.registerGateway(address1, rmGateway1);
-                       rpc.registerGateway(address2, rmGateway2);
-
-                       TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService();
-
-                       TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
-                       
haServices.setResourceManagerLeaderRetriever(testLeaderService);
-
-                       TaskExecutor taskManager = 
TaskExecutor.startTaskManagerComponentsAndActor(
-                               new Configuration(), resourceID, rpc, 
"localhost", haServices, true);
-                       String taskManagerAddress = taskManager.getAddress();
-                       taskManager.start();
-
-                       // no connection initially, since there is no leader
-                       assertNull(taskManager.getResourceManagerConnection());
-
-                       // define a leader and see that a registration happens
-                       testLeaderService.notifyListener(address1, leaderId1);
-
-                       verify(rmGateway1, timeout(5000)).registerTaskExecutor(
-                                       eq(leaderId1), eq(taskManagerAddress), 
eq(resourceID), any(FiniteDuration.class));
-                       
assertNotNull(taskManager.getResourceManagerConnection());
-
-                       // cancel the leader 
-                       testLeaderService.notifyListener(null, null);
-
-                       // set a new leader, see that a registration happens 
-                       testLeaderService.notifyListener(address2, leaderId2);
-
-                       verify(rmGateway2, timeout(5000)).registerTaskExecutor(
-                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), any(FiniteDuration.class));
-                       
assertNotNull(taskManager.getResourceManagerConnection());
-               }
-               finally {
-                       rpc.stopService();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f12ba3e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
new file mode 100644
index 0000000..a8d5bd7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TaskExecutorTest extends TestLogger {
+
+       @Test
+       public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
+               final ResourceID resourceID = ResourceID.generate();
+               final String resourceManagerAddress = 
"/resource/manager/address/one";
+
+               final TestingRpcService rpc = new TestingRpcService();
+               try {
+                       // register a mock resource manager gateway
+                       ResourceManagerGateway rmGateway = 
mock(ResourceManagerGateway.class);
+                       rpc.registerGateway(resourceManagerAddress, rmGateway);
+
+                       NonHaServices haServices = new 
NonHaServices(resourceManagerAddress);
+                       TaskExecutor taskManager = 
TaskExecutor.startTaskManagerComponentsAndActor(
+                               new Configuration(), resourceID, rpc, 
"localhost", haServices, true);
+                       String taskManagerAddress = taskManager.getAddress();
+                       taskManager.start();
+
+                       verify(rmGateway, timeout(5000)).registerTaskExecutor(
+                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), any(FiniteDuration.class));
+               }
+               finally {
+                       rpc.stopService();
+               }
+       }
+
+       @Test
+       public void testTriggerRegistrationOnLeaderChange() throws Exception {
+               final ResourceID resourceID = ResourceID.generate();
+
+               final String address1 = "/resource/manager/address/one";
+               final String address2 = "/resource/manager/address/two";
+               final UUID leaderId1 = UUID.randomUUID();
+               final UUID leaderId2 = UUID.randomUUID();
+
+               final TestingRpcService rpc = new TestingRpcService();
+               try {
+                       // register the mock resource manager gateways
+                       ResourceManagerGateway rmGateway1 = 
mock(ResourceManagerGateway.class);
+                       ResourceManagerGateway rmGateway2 = 
mock(ResourceManagerGateway.class);
+                       rpc.registerGateway(address1, rmGateway1);
+                       rpc.registerGateway(address2, rmGateway2);
+
+                       TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService();
+
+                       TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+                       
haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+                       TaskExecutor taskManager = 
TaskExecutor.startTaskManagerComponentsAndActor(
+                               new Configuration(), resourceID, rpc, 
"localhost", haServices, true);
+                       String taskManagerAddress = taskManager.getAddress();
+                       taskManager.start();
+
+                       // no connection initially, since there is no leader
+                       assertNull(taskManager.getResourceManagerConnection());
+
+                       // define a leader and see that a registration happens
+                       testLeaderService.notifyListener(address1, leaderId1);
+
+                       verify(rmGateway1, timeout(5000)).registerTaskExecutor(
+                                       eq(leaderId1), eq(taskManagerAddress), 
eq(resourceID), any(FiniteDuration.class));
+                       
assertNotNull(taskManager.getResourceManagerConnection());
+
+                       // cancel the leader 
+                       testLeaderService.notifyListener(null, null);
+
+                       // set a new leader, see that a registration happens 
+                       testLeaderService.notifyListener(address2, leaderId2);
+
+                       verify(rmGateway2, timeout(5000)).registerTaskExecutor(
+                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), any(FiniteDuration.class));
+                       
assertNotNull(taskManager.getResourceManagerConnection());
+               }
+               finally {
+                       rpc.stopService();
+               }
+       }
+}

Reply via email to