[FLINK-7871] [flip6] Add ManualClock for SlotPool slot release tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f429b4cd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f429b4cd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f429b4cd Branch: refs/heads/master Commit: f429b4cdeef7cce07341c1d0e8cdf23a89399dc7 Parents: 0587bf8 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Feb 12 11:39:17 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Feb 12 12:24:52 2018 +0100 ---------------------------------------------------------------------- .../runtime/jobmaster/slotpool/SlotPool.java | 127 ++++++++----------- .../jobmaster/slotpool/SlotPoolTest.java | 70 +++++----- .../flink/runtime/util/clock/ManualClock.java | 49 +++++++ 3 files changed, 138 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f429b4cd/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index ca492b5..12a6012 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -87,20 +88,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions { - // ------------------------------------------------------------------------ - - private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(10); - - private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(5); - - private static final Time DEFAULT_TIMEOUT = Time.seconds(10); - - private static final Time DEFAULT_IDLE_SLOT_TIMEOUT = Time.minutes(5); - - private static final int NUM_RELEASE_SLOT_TRIES = 3; - - // ------------------------------------------------------------------------ - private final JobID jobId; private final ProviderAndOwner providerAndOwner; @@ -150,10 +137,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS rpcService, jobId, SystemClock.getInstance(), - DEFAULT_SLOT_REQUEST_TIMEOUT, - DEFAULT_RM_ALLOCATION_TIMEOUT, - DEFAULT_TIMEOUT, - DEFAULT_IDLE_SLOT_TIMEOUT); + Time.milliseconds(JobManagerOptions.SLOT_REQUEST_TIMEOUT.defaultValue()), + Time.milliseconds(JobManagerOptions.SLOT_ALLOCATION_RM_TIMEOUT.defaultValue()), + Time.milliseconds(JobManagerOptions.SLOT_REQUEST_RM_TIMEOUT.defaultValue()), + Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue())); } public SlotPool( @@ -214,7 +201,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS throw new RuntimeException("This should never happen", e); } - scheduleRunAsync(() -> checkIdleSlot(), idleSlotTimeout); + scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); } @Override @@ -281,50 +268,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS return providerAndOwner; } - /** - * Check the available slots, release the slot that is idle for a long time. - */ - protected void checkIdleSlot() { - - // The timestamp in SlotAndTimestamp is relative - final long currentRelativeTimeMillis = clock.relativeTimeMillis(); - - final List<AllocatedSlot> expiredSlots = new ArrayList<>(); - - availableSlots.availableSlots.forEach((allocationID, slotAndTimestamp) -> { - - if (slotAndTimestamp != null && - currentRelativeTimeMillis - slotAndTimestamp.timestamp() > idleSlotTimeout.toMilliseconds()) { - - expiredSlots.add(slotAndTimestamp.slot()); - - } - }); - - for (AllocatedSlot expiredSlot : expiredSlots) { - final AllocationID allocationID = expiredSlot.getAllocationId(); - if (availableSlots.tryRemove(allocationID)) { - - log.info("Releasing idle slot {}.", allocationID); - - final CompletableFuture<Acknowledge> future = FutureUtils.retry( - () -> expiredSlot.getTaskManagerGateway().freeSlot( - allocationID, - new FlinkException("Releasing idle slot " + allocationID), - timeout), - NUM_RELEASE_SLOT_TRIES, - getMainThreadExecutor()); - - future.exceptionally(throwable -> { - log.warn("Releasing idle slot {} failed.", allocationID, throwable); - return null; - }); - } - } - - scheduleRunAsync(() -> checkIdleSlot(), idleSlotTimeout); - } - // ------------------------------------------------------------------------ // Resource Manager Connection // ------------------------------------------------------------------------ @@ -715,8 +658,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS final ResourceManagerGateway resourceManagerGateway, final PendingRequest pendingRequest) { - Preconditions.checkNotNull(resourceManagerGateway); - Preconditions.checkNotNull(pendingRequest); + checkNotNull(resourceManagerGateway); + checkNotNull(pendingRequest); log.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId()); @@ -755,12 +698,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS private void slotRequestToResourceManagerSuccess(final SlotRequestId requestId) { // a request is pending from the ResourceManager to a (future) TaskManager // we only add the watcher here in case that request times out - scheduleRunAsync(new Runnable() { - @Override - public void run() { - checkTimeoutSlotAllocation(requestId); - } - }, resourceManagerAllocationTimeout); + scheduleRunAsync(() -> checkTimeoutSlotAllocation(requestId), resourceManagerAllocationTimeout); } private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) { @@ -1148,6 +1086,48 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS } /** + * Check the available slots, release the slot that is idle for a long time. + */ + private void checkIdleSlot() { + + // The timestamp in SlotAndTimestamp is relative + final long currentRelativeTimeMillis = clock.relativeTimeMillis(); + + final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size()); + + for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) { + if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) { + expiredSlots.add(slotAndTimestamp.slot); + } + } + + final FlinkException cause = new FlinkException("Releasing idle slot."); + + for (AllocatedSlot expiredSlot : expiredSlots) { + final AllocationID allocationID = expiredSlot.getAllocationId(); + if (availableSlots.tryRemove(allocationID)) { + + log.info("Releasing idle slot {}.", allocationID); + final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot( + allocationID, + cause, + timeout); + + freeSlotFuture.whenCompleteAsync( + (Acknowledge ignored, Throwable throwable) -> { + if (throwable != null) { + log.info("Releasing idle slot {} failed.", allocationID, throwable); + tryFulfillSlotRequestOrMakeAvailable(expiredSlot); + } + }, + getMainThreadExecutor()); + } + } + + scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); + } + + /** * Clear the internal state of the SlotPool. */ private void clear() { @@ -1183,6 +1163,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS return waitingForResourceManager; } + @VisibleForTesting + void triggerCheckIdleSlot() { + runAsync(this::checkIdleSlot); + } + // ------------------------------------------------------------------------ // Helper classes // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f429b4cd/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java index 5c0d661..beb1de9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.runtime.util.clock.SystemClock; +import org.apache.flink.runtime.util.clock.ManualClock; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -62,6 +62,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -89,6 +90,8 @@ public class SlotPoolTest extends TestLogger { private SimpleAckingTaskManagerGateway taskManagerGateway; + private TestingResourceManagerGateway resourceManagerGateway; + @Before public void setUp() throws Exception { this.rpcService = new TestingRpcService(); @@ -96,6 +99,7 @@ public class SlotPoolTest extends TestLogger { taskManagerLocation = new LocalTaskManagerLocation(); taskManagerGateway = new SimpleAckingTaskManagerGateway(); + resourceManagerGateway = new TestingResourceManagerGateway(); } @After @@ -105,7 +109,6 @@ public class SlotPoolTest extends TestLogger { @Test public void testAllocateSimpleSlot() throws Exception { - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest)); @@ -145,7 +148,6 @@ public class SlotPoolTest extends TestLogger { @Test public void testAllocationFulfilledByReturnedSlot() throws Exception { - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); final ArrayBlockingQueue<SlotRequest> slotRequestQueue = new ArrayBlockingQueue<>(2); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> { @@ -215,7 +217,6 @@ public class SlotPoolTest extends TestLogger { @Test public void testAllocateWithFreeSlot() throws Exception { - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest)); @@ -273,7 +274,6 @@ public class SlotPoolTest extends TestLogger { @Test public void testOfferSlot() throws Exception { - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest)); @@ -332,7 +332,6 @@ public class SlotPoolTest extends TestLogger { @Test public void testReleaseResource() throws Exception { - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>(); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest)); @@ -416,7 +415,6 @@ public class SlotPoolTest extends TestLogger { final CompletableFuture<AllocationID> cancelSlotFuture = new CompletableFuture<>(); final CompletableFuture<AllocationID> requestSlotFutureAllocationId = new CompletableFuture<>(); - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); resourceManagerGateway.setRequestSlotFuture(requestSlotFuture); resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId())); resourceManagerGateway.setCancelSlotConsumer(allocationID -> cancelSlotFuture.complete(allocationID)); @@ -427,11 +425,7 @@ public class SlotPoolTest extends TestLogger { null); try { - slotPool.start(JobMasterId.generate(), "localhost"); - - final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); - - slotPoolGateway.connectToResourceManager(resourceManagerGateway); + final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot( new SlotRequestId(), @@ -472,10 +466,7 @@ public class SlotPoolTest extends TestLogger { public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { final SlotPool slotPool = new SlotPool(rpcService, jobId); - final JobMasterId jobMasterId = JobMasterId.generate(); - final String jobMasterAddress = "foobar"; final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>(); - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); resourceManagerGateway.setRequestSlotConsumer( (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); @@ -484,17 +475,13 @@ public class SlotPoolTest extends TestLogger { final SlotRequestId slotRequestId2 = new SlotRequestId(); try { - slotPool.start(jobMasterId, jobMasterAddress); - - final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); final ScheduledUnit scheduledUnit = new ScheduledUnit( new JobVertexID(), null, null); - slotPoolGateway.connectToResourceManager(resourceManagerGateway); - CompletableFuture<LogicalSlot> slotFuture1 = slotPoolGateway.allocateSlot( slotRequestId1, scheduledUnit, @@ -545,7 +532,6 @@ public class SlotPoolTest extends TestLogger { @Test public void testShutdownReleasesAllSlots() throws Exception { final SlotPool slotPool = new SlotPool(rpcService, jobId); - final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); try { final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); @@ -595,39 +581,49 @@ public class SlotPoolTest extends TestLogger { @Test public void testCheckIdleSlot() throws Exception { + final ManualClock clock = new ManualClock(); final SlotPool slotPool = new SlotPool( rpcService, jobId, - SystemClock.getInstance(), + clock, TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), timeout); try { - final List<AllocationID> freedSlots = new ArrayList<>(); - taskManagerGateway.setFreeSlotConsumer((tuple) -> { - freedSlots.add(tuple.f0); - }); + final BlockingQueue<AllocationID> freedSlots = new ArrayBlockingQueue<>(1); + taskManagerGateway.setFreeSlotConsumer((tuple) -> freedSlots.offer(tuple.f0)); + + final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); final AllocationID expiredSlotID = new AllocationID(); final AllocationID freshSlotID = new AllocationID(); + final SlotOffer slotToExpire = new SlotOffer(expiredSlotID, 0, ResourceProfile.UNKNOWN); + final SlotOffer slotToNotExpire = new SlotOffer(freshSlotID, 1, ResourceProfile.UNKNOWN); + + assertThat( + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(), + Matchers.is(Acknowledge.get())); + + assertThat( + slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotToExpire).get(), + Matchers.is(true)); + + clock.advanceTime(timeout.toMilliseconds() - 1L, TimeUnit.MILLISECONDS); - slotPool.getAvailableSlots().add(createSlot(expiredSlotID), - SystemClock.getInstance().relativeTimeMillis() - timeout.toMilliseconds() - 1); + assertThat( + slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotToNotExpire).get(), + Matchers.is(true)); - // Add a 10 s floating buffer time. - final long floatingTimeBuffer = 10000; - slotPool.getAvailableSlots().add(createSlot(freshSlotID), - SystemClock.getInstance().relativeTimeMillis() - timeout.toMilliseconds() + floatingTimeBuffer); + clock.advanceTime(1L, TimeUnit.MILLISECONDS); - slotPool.checkIdleSlot(); + slotPool.triggerCheckIdleSlot(); - assertEquals(1, freedSlots.size()); - assertEquals(expiredSlotID, freedSlots.get(0)); - assertFalse(slotPool.getAvailableSlots().contains(expiredSlotID)); - assertTrue(slotPool.getAvailableSlots().contains(freshSlotID)); + final AllocationID freedSlot = freedSlots.poll(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + assertThat(freedSlot, Matchers.is(expiredSlotID)); + assertThat(freedSlots.isEmpty(), Matchers.is(true)); } finally { RpcUtils.terminateRpcEndpoint(slotPool, timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/f429b4cd/flink-runtime/src/test/java/org/apache/flink/runtime/util/clock/ManualClock.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/clock/ManualClock.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/clock/ManualClock.java new file mode 100644 index 0000000..69ae7be --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/clock/ManualClock.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.clock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * {@link Clock} implementation which allows to advance time manually. + */ +public class ManualClock extends Clock { + + private AtomicLong currentTime = new AtomicLong(0L); + + @Override + public long absoluteTimeMillis() { + return currentTime.get() / 1_000L; + } + + @Override + public long relativeTimeMillis() { + return currentTime.get() / 1_000L; + } + + @Override + public long relativeTimeNanos() { + return currentTime.get(); + } + + public void advanceTime(long duration, TimeUnit timeUnit) { + currentTime.addAndGet(timeUnit.toNanos(duration)); + } +}