[FLINK-7871] [flip6] Add ManualClock for SlotPool slot release tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f429b4cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f429b4cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f429b4cd

Branch: refs/heads/master
Commit: f429b4cdeef7cce07341c1d0e8cdf23a89399dc7
Parents: 0587bf8
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Feb 12 11:39:17 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 12 12:24:52 2018 +0100

----------------------------------------------------------------------
 .../runtime/jobmaster/slotpool/SlotPool.java    | 127 ++++++++-----------
 .../jobmaster/slotpool/SlotPoolTest.java        |  70 +++++-----
 .../flink/runtime/util/clock/ManualClock.java   |  49 +++++++
 3 files changed, 138 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f429b4cd/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index ca492b5..12a6012 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -87,20 +88,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SlotPool extends RpcEndpoint implements SlotPoolGateway, 
AllocatedSlotActions {
 
-       // 
------------------------------------------------------------------------
-
-       private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = 
Time.minutes(10);
-
-       private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = 
Time.minutes(5);
-
-       private static final Time DEFAULT_TIMEOUT = Time.seconds(10);
-
-       private static final Time DEFAULT_IDLE_SLOT_TIMEOUT = Time.minutes(5);
-
-       private static final int NUM_RELEASE_SLOT_TRIES = 3;
-
-       // 
------------------------------------------------------------------------
-
        private final JobID jobId;
 
        private final ProviderAndOwner providerAndOwner;
@@ -150,10 +137,10 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        rpcService,
                        jobId,
                        SystemClock.getInstance(),
-                       DEFAULT_SLOT_REQUEST_TIMEOUT,
-                       DEFAULT_RM_ALLOCATION_TIMEOUT,
-                       DEFAULT_TIMEOUT,
-                       DEFAULT_IDLE_SLOT_TIMEOUT);
+                       
Time.milliseconds(JobManagerOptions.SLOT_REQUEST_TIMEOUT.defaultValue()),
+                       
Time.milliseconds(JobManagerOptions.SLOT_ALLOCATION_RM_TIMEOUT.defaultValue()),
+                       
Time.milliseconds(JobManagerOptions.SLOT_REQUEST_RM_TIMEOUT.defaultValue()),
+                       
Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()));
        }
 
        public SlotPool(
@@ -214,7 +201,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        throw new RuntimeException("This should never happen", 
e);
                }
 
-               scheduleRunAsync(() -> checkIdleSlot(), idleSlotTimeout);
+               scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
        }
 
        @Override
@@ -281,50 +268,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                return providerAndOwner;
        }
 
-       /**
-        * Check the available slots, release the slot that is idle for a long 
time.
-        */
-       protected void checkIdleSlot() {
-
-               // The timestamp in SlotAndTimestamp is relative
-               final long currentRelativeTimeMillis = 
clock.relativeTimeMillis();
-
-               final List<AllocatedSlot> expiredSlots = new ArrayList<>();
-
-               availableSlots.availableSlots.forEach((allocationID, 
slotAndTimestamp) -> {
-
-                       if (slotAndTimestamp != null &&
-                               currentRelativeTimeMillis - 
slotAndTimestamp.timestamp() > idleSlotTimeout.toMilliseconds()) {
-
-                               expiredSlots.add(slotAndTimestamp.slot());
-
-                       }
-               });
-
-               for (AllocatedSlot expiredSlot : expiredSlots) {
-                       final AllocationID allocationID = 
expiredSlot.getAllocationId();
-                       if (availableSlots.tryRemove(allocationID)) {
-
-                               log.info("Releasing idle slot {}.", 
allocationID);
-
-                               final CompletableFuture<Acknowledge> future = 
FutureUtils.retry(
-                                       () -> 
expiredSlot.getTaskManagerGateway().freeSlot(
-                                               allocationID,
-                                               new FlinkException("Releasing 
idle slot " + allocationID),
-                                               timeout),
-                                       NUM_RELEASE_SLOT_TRIES,
-                                       getMainThreadExecutor());
-
-                               future.exceptionally(throwable -> {
-                                       log.warn("Releasing idle slot {} 
failed.", allocationID, throwable);
-                                       return null;
-                               });
-                       }
-               }
-
-               scheduleRunAsync(() -> checkIdleSlot(), idleSlotTimeout);
-       }
-
        // 
------------------------------------------------------------------------
        //  Resource Manager Connection
        // 
------------------------------------------------------------------------
@@ -715,8 +658,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        final ResourceManagerGateway resourceManagerGateway,
                        final PendingRequest pendingRequest) {
 
-               Preconditions.checkNotNull(resourceManagerGateway);
-               Preconditions.checkNotNull(pendingRequest);
+               checkNotNull(resourceManagerGateway);
+               checkNotNull(pendingRequest);
 
                log.info("Requesting slot with profile {} from resource manager 
(request = {}).", pendingRequest.getResourceProfile(), 
pendingRequest.getSlotRequestId());
 
@@ -755,12 +698,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        private void slotRequestToResourceManagerSuccess(final SlotRequestId 
requestId) {
                // a request is pending from the ResourceManager to a (future) 
TaskManager
                // we only add the watcher here in case that request times out
-               scheduleRunAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               checkTimeoutSlotAllocation(requestId);
-                       }
-               }, resourceManagerAllocationTimeout);
+               scheduleRunAsync(() -> checkTimeoutSlotAllocation(requestId), 
resourceManagerAllocationTimeout);
        }
 
        private void slotRequestToResourceManagerFailed(SlotRequestId 
slotRequestID, Throwable failure) {
@@ -1148,6 +1086,48 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        }
 
        /**
+        * Check the available slots, release the slot that is idle for a long 
time.
+        */
+       private void checkIdleSlot() {
+
+               // The timestamp in SlotAndTimestamp is relative
+               final long currentRelativeTimeMillis = 
clock.relativeTimeMillis();
+
+               final List<AllocatedSlot> expiredSlots = new 
ArrayList<>(availableSlots.size());
+
+               for (SlotAndTimestamp slotAndTimestamp : 
availableSlots.availableSlots.values()) {
+                       if (currentRelativeTimeMillis - 
slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
+                               expiredSlots.add(slotAndTimestamp.slot);
+                       }
+               }
+
+               final FlinkException cause = new FlinkException("Releasing idle 
slot.");
+
+               for (AllocatedSlot expiredSlot : expiredSlots) {
+                       final AllocationID allocationID = 
expiredSlot.getAllocationId();
+                       if (availableSlots.tryRemove(allocationID)) {
+
+                               log.info("Releasing idle slot {}.", 
allocationID);
+                               final CompletableFuture<Acknowledge> 
freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
+                                       allocationID,
+                                       cause,
+                                       timeout);
+
+                               freeSlotFuture.whenCompleteAsync(
+                                       (Acknowledge ignored, Throwable 
throwable) -> {
+                                               if (throwable != null) {
+                                                       log.info("Releasing 
idle slot {} failed.", allocationID, throwable);
+                                                       
tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
+                                               }
+                                       },
+                                       getMainThreadExecutor());
+                       }
+               }
+
+               scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
+       }
+
+       /**
         * Clear the internal state of the SlotPool.
         */
        private void clear() {
@@ -1183,6 +1163,11 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                return waitingForResourceManager;
        }
 
+       @VisibleForTesting
+       void triggerCheckIdleSlot() {
+               runAsync(this::checkIdleSlot);
+       }
+
        // 
------------------------------------------------------------------------
        //  Helper classes
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f429b4cd/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 5c0d661..beb1de9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.runtime.util.clock.ManualClock;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -62,6 +62,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -89,6 +90,8 @@ public class SlotPoolTest extends TestLogger {
 
        private SimpleAckingTaskManagerGateway taskManagerGateway;
 
+       private TestingResourceManagerGateway resourceManagerGateway;
+
        @Before
        public void setUp() throws Exception {
                this.rpcService = new TestingRpcService();
@@ -96,6 +99,7 @@ public class SlotPoolTest extends TestLogger {
 
                taskManagerLocation = new LocalTaskManagerLocation();
                taskManagerGateway = new SimpleAckingTaskManagerGateway();
+               resourceManagerGateway = new TestingResourceManagerGateway();
        }
 
        @After
@@ -105,7 +109,6 @@ public class SlotPoolTest extends TestLogger {
 
        @Test
        public void testAllocateSimpleSlot() throws Exception {
-               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
                CompletableFuture<SlotRequest> slotRequestFuture = new 
CompletableFuture<>();
                resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
slotRequestFuture.complete(slotRequest));
 
@@ -145,7 +148,6 @@ public class SlotPoolTest extends TestLogger {
 
        @Test
        public void testAllocationFulfilledByReturnedSlot() throws Exception {
-               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
                final ArrayBlockingQueue<SlotRequest> slotRequestQueue = new 
ArrayBlockingQueue<>(2);
 
                resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
@@ -215,7 +217,6 @@ public class SlotPoolTest extends TestLogger {
 
        @Test
        public void testAllocateWithFreeSlot() throws Exception {
-               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
                final CompletableFuture<SlotRequest> slotRequestFuture = new 
CompletableFuture<>();
                resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
slotRequestFuture.complete(slotRequest));
 
@@ -273,7 +274,6 @@ public class SlotPoolTest extends TestLogger {
 
        @Test
        public void testOfferSlot() throws Exception {
-               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
                final CompletableFuture<SlotRequest> slotRequestFuture = new 
CompletableFuture<>();
 
                resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
slotRequestFuture.complete(slotRequest));
@@ -332,7 +332,6 @@ public class SlotPoolTest extends TestLogger {
 
        @Test
        public void testReleaseResource() throws Exception {
-               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
                final CompletableFuture<SlotRequest> slotRequestFuture = new 
CompletableFuture<>();
 
                resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
slotRequestFuture.complete(slotRequest));
@@ -416,7 +415,6 @@ public class SlotPoolTest extends TestLogger {
                final CompletableFuture<AllocationID> cancelSlotFuture = new 
CompletableFuture<>();
                final CompletableFuture<AllocationID> 
requestSlotFutureAllocationId = new CompletableFuture<>();
 
-               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
                resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
                resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
                resourceManagerGateway.setCancelSlotConsumer(allocationID -> 
cancelSlotFuture.complete(allocationID));
@@ -427,11 +425,7 @@ public class SlotPoolTest extends TestLogger {
                        null);
 
                try {
-                       slotPool.start(JobMasterId.generate(), "localhost");
-
-                       final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
-
-                       
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+                       final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
 
                        CompletableFuture<LogicalSlot> slotFuture = 
slotPoolGateway.allocateSlot(
                                new SlotRequestId(),
@@ -472,10 +466,7 @@ public class SlotPoolTest extends TestLogger {
        public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws 
Exception {
                final SlotPool slotPool = new SlotPool(rpcService, jobId);
 
-               final JobMasterId jobMasterId = JobMasterId.generate();
-               final String jobMasterAddress = "foobar";
                final CompletableFuture<AllocationID> allocationIdFuture = new 
CompletableFuture<>();
-               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
 
                resourceManagerGateway.setRequestSlotConsumer(
                        (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
@@ -484,17 +475,13 @@ public class SlotPoolTest extends TestLogger {
                final SlotRequestId slotRequestId2 = new SlotRequestId();
 
                try {
-                       slotPool.start(jobMasterId, jobMasterAddress);
-
-                       final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+                       final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
 
                        final ScheduledUnit scheduledUnit = new ScheduledUnit(
                                new JobVertexID(),
                                null,
                                null);
 
-                       
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
-
                        CompletableFuture<LogicalSlot> slotFuture1 = 
slotPoolGateway.allocateSlot(
                                slotRequestId1,
                                scheduledUnit,
@@ -545,7 +532,6 @@ public class SlotPoolTest extends TestLogger {
        @Test
        public void testShutdownReleasesAllSlots() throws Exception {
                final SlotPool slotPool = new SlotPool(rpcService, jobId);
-               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
 
                try {
                        final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
@@ -595,39 +581,49 @@ public class SlotPoolTest extends TestLogger {
 
        @Test
        public void testCheckIdleSlot() throws Exception {
+               final ManualClock clock = new ManualClock();
                final SlotPool slotPool = new SlotPool(
                        rpcService,
                        jobId,
-                       SystemClock.getInstance(),
+                       clock,
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
                        timeout);
 
                try {
-                       final List<AllocationID> freedSlots = new ArrayList<>();
-                       taskManagerGateway.setFreeSlotConsumer((tuple) -> {
-                               freedSlots.add(tuple.f0);
-                       });
+                       final BlockingQueue<AllocationID> freedSlots = new 
ArrayBlockingQueue<>(1);
+                       taskManagerGateway.setFreeSlotConsumer((tuple) -> 
freedSlots.offer(tuple.f0));
+
+                       final SlotPoolGateway slotPoolGateway = 
setupSlotPool(slotPool, resourceManagerGateway);
 
                        final AllocationID expiredSlotID = new AllocationID();
                        final AllocationID freshSlotID = new AllocationID();
+                       final SlotOffer slotToExpire = new 
SlotOffer(expiredSlotID, 0, ResourceProfile.UNKNOWN);
+                       final SlotOffer slotToNotExpire = new 
SlotOffer(freshSlotID, 1, ResourceProfile.UNKNOWN);
+
+                       assertThat(
+                               
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(),
+                               Matchers.is(Acknowledge.get()));
+
+                       assertThat(
+                               slotPoolGateway.offerSlot(taskManagerLocation, 
taskManagerGateway, slotToExpire).get(),
+                               Matchers.is(true));
+
+                       clock.advanceTime(timeout.toMilliseconds() - 1L, 
TimeUnit.MILLISECONDS);
 
-                       
slotPool.getAvailableSlots().add(createSlot(expiredSlotID),
-                               SystemClock.getInstance().relativeTimeMillis() 
- timeout.toMilliseconds() - 1);
+                       assertThat(
+                               slotPoolGateway.offerSlot(taskManagerLocation, 
taskManagerGateway, slotToNotExpire).get(),
+                               Matchers.is(true));
 
-                       // Add a 10 s floating buffer time.
-                       final long floatingTimeBuffer = 10000;
-                       
slotPool.getAvailableSlots().add(createSlot(freshSlotID),
-                               SystemClock.getInstance().relativeTimeMillis() 
- timeout.toMilliseconds() + floatingTimeBuffer);
+                       clock.advanceTime(1L, TimeUnit.MILLISECONDS);
 
-                       slotPool.checkIdleSlot();
+                       slotPool.triggerCheckIdleSlot();
 
-                       assertEquals(1, freedSlots.size());
-                       assertEquals(expiredSlotID, freedSlots.get(0));
-                       
assertFalse(slotPool.getAvailableSlots().contains(expiredSlotID));
-                       
assertTrue(slotPool.getAvailableSlots().contains(freshSlotID));
+                       final AllocationID freedSlot = 
freedSlots.poll(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
+                       assertThat(freedSlot, Matchers.is(expiredSlotID));
+                       assertThat(freedSlots.isEmpty(), Matchers.is(true));
                } finally {
                        RpcUtils.terminateRpcEndpoint(slotPool, timeout);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f429b4cd/flink-runtime/src/test/java/org/apache/flink/runtime/util/clock/ManualClock.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/clock/ManualClock.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/clock/ManualClock.java
new file mode 100644
index 0000000..69ae7be
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/clock/ManualClock.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.clock;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link Clock} implementation which allows to advance time manually.
+ */
+public class ManualClock extends Clock {
+
+       private AtomicLong currentTime = new AtomicLong(0L);
+
+       @Override
+       public long absoluteTimeMillis() {
+               return currentTime.get() / 1_000L;
+       }
+
+       @Override
+       public long relativeTimeMillis() {
+               return currentTime.get() / 1_000L;
+       }
+
+       @Override
+       public long relativeTimeNanos() {
+               return currentTime.get();
+       }
+
+       public void advanceTime(long duration, TimeUnit timeUnit) {
+               currentTime.addAndGet(timeUnit.toNanos(duration));
+       }
+}

Reply via email to