This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 22a96442d911ee1d35b59abeebfc16fd8b0a9542 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Tue Jul 30 14:54:19 2019 +0200 [hotfix] Remove polling loops from StandaloneResourceManagerTest --- .../StandaloneResourceManagerTest.java | 61 +++++----------------- 1 file changed, 14 insertions(+), 47 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java index 89b6fa6..39d06a0 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.resourcemanager; -import org.apache.flink.api.common.time.Deadline; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -35,18 +34,17 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcServiceResource; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; -import org.apache.flink.util.function.SupplierWithException; import org.junit.ClassRule; import org.junit.Test; -import java.time.Duration; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.IsNull.nullValue; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; /** * Tests for the Standalone Resource Manager. @@ -56,7 +54,7 @@ public class StandaloneResourceManagerTest extends TestLogger { @ClassRule public static final TestingRpcServiceResource RPC_SERVICE = new TestingRpcServiceResource(); - private static final Time TIMEOUT = Time.seconds(10); + private static final Time TIMEOUT = Time.seconds(10L); private final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); @@ -64,14 +62,12 @@ public class StandaloneResourceManagerTest extends TestLogger { public void testStartupPeriod() throws Exception { final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>(); final SlotManager slotManager = new TestingSlotManagerFactory() - .setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke)) + .setSetFailUnfulfillableRequestConsumer(setFailUnfulfillableRequestInvokes::add) .createSlotManager(); final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(1L), slotManager); - final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1)); - assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline); - assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false)); - assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true)); + assertThat(setFailUnfulfillableRequestInvokes.take(), is(false)); + assertThat(setFailUnfulfillableRequestInvokes.take(), is(true)); rm.close(); } @@ -80,14 +76,12 @@ public class StandaloneResourceManagerTest extends TestLogger { public void testNoStartupPeriod() throws Exception { final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>(); final SlotManager slotManager = new TestingSlotManagerFactory() - .setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke)) + .setSetFailUnfulfillableRequestConsumer(setFailUnfulfillableRequestInvokes::add) .createSlotManager(); final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(-1L), slotManager); - final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1)); - assertNotHappens(() -> setFailUnfulfillableRequestInvokes.size() > 1, deadline); - assertThat(setFailUnfulfillableRequestInvokes.size(), is(1)); - assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false)); + assertThat(setFailUnfulfillableRequestInvokes.take(), is(false)); + assertThat(setFailUnfulfillableRequestInvokes.poll(50L, TimeUnit.MILLISECONDS), is(nullValue())); rm.close(); } @@ -96,22 +90,18 @@ public class StandaloneResourceManagerTest extends TestLogger { public void testStartUpPeriodAfterLeadershipSwitch() throws Exception { final LinkedBlockingQueue<Boolean> setFailUnfulfillableRequestInvokes = new LinkedBlockingQueue<>(); final SlotManager slotManager = new TestingSlotManagerFactory() - .setSetFailUnfulfillableRequestConsumer(invoke -> setFailUnfulfillableRequestInvokes.add(invoke)) + .setSetFailUnfulfillableRequestConsumer(setFailUnfulfillableRequestInvokes::add) .createSlotManager(); final TestingStandaloneResourceManager rm = createResourceManager(Time.milliseconds(1L), slotManager); - final Deadline deadline1 = Deadline.fromNow(Duration.ofSeconds(1)); - assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline1); - assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false)); - assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true)); + assertThat(setFailUnfulfillableRequestInvokes.take(), is(false)); + assertThat(setFailUnfulfillableRequestInvokes.take(), is(true)); rm.rmServices.revokeLeadership(); rm.rmServices.grantLeadership(); - final Deadline deadline2 = Deadline.fromNow(Duration.ofSeconds(1L)); - assertHappensUntil(() -> setFailUnfulfillableRequestInvokes.size() == 2, deadline2); - assertThat(setFailUnfulfillableRequestInvokes.poll(), is(false)); - assertThat(setFailUnfulfillableRequestInvokes.poll(), is(true)); + assertThat(setFailUnfulfillableRequestInvokes.take(), is(false)); + assertThat(setFailUnfulfillableRequestInvokes.take(), is(true)); } private TestingStandaloneResourceManager createResourceManager(Time startupPeriod, SlotManager slotManager) throws Exception { @@ -142,29 +132,6 @@ public class StandaloneResourceManagerTest extends TestLogger { return rm; } - private static void assertHappensUntil( - SupplierWithException<Boolean, InterruptedException> condition, - Deadline until) throws InterruptedException { - while (!condition.get()) { - if (!until.hasTimeLeft()) { - fail("condition was not fulfilled before the deadline"); - } - Thread.sleep(2); - } - } - - private static void assertNotHappens( - SupplierWithException<Boolean, InterruptedException> condition, - Deadline until) throws InterruptedException { - while (!condition.get()) { - if (!until.hasTimeLeft()) { - return; - } - Thread.sleep(2); - } - fail("condition was fulfilled before the deadline"); - } - private static class TestingStandaloneResourceManager extends StandaloneResourceManager { private final MockResourceManagerRuntimeServices rmServices;