[FLINK-8778] Port queryable state ITCases to use MiniClusterResource
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/735c95fb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/735c95fb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/735c95fb Branch: refs/heads/release-1.5 Commit: 735c95fbb93316475dc6dd6daa2aa91a7158c843 Parents: 7ab8ac3 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Feb 26 11:55:14 2018 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Sun Mar 11 08:31:56 2018 -0700 ---------------------------------------------------------------------- .../itcases/AbstractQueryableStateTestBase.java | 240 ++++++++----------- .../HAAbstractQueryableStateTestBase.java | 93 ------- .../HAQueryableStateFsBackendITCase.java | 90 ++++++- .../HAQueryableStateRocksDBBackendITCase.java | 91 ++++++- .../NonHAAbstractQueryableStateTestBase.java | 75 ------ .../NonHAQueryableStateFsBackendITCase.java | 60 ++++- ...NonHAQueryableStateRocksDBBackendITCase.java | 61 ++++- 7 files changed, 375 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java index 623e42b..e99a28b 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java @@ -37,12 +37,15 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.queryablestate.client.VoidNamespace; @@ -53,12 +56,9 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; -import org.apache.flink.runtime.minicluster.FlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.QueryableStateStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -68,6 +68,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -76,6 +77,7 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -93,11 +95,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - +import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -106,7 +106,7 @@ import static org.junit.Assert.fail; */ public abstract class AbstractQueryableStateTestBase extends TestLogger { - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS); + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L); public static final long RETRY_TIMEOUT = 50L; private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); @@ -118,27 +118,22 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { protected AbstractStateBackend stateBackend; /** - * Shared between all the test. Make sure to have at least NUM_SLOTS - * available after your test finishes, e.g. cancel the job you submitted. - */ - protected static FlinkMiniCluster cluster; - - /** * Client shared between all the test. */ protected static QueryableStateClient client; + protected static ClusterClient<?> clusterClient; + protected static int maxParallelism; @Before public void setUp() throws Exception { - // NOTE: do not use a shared instance for all tests as the tests may brake + // NOTE: do not use a shared instance for all tests as the tests may break this.stateBackend = createStateBackend(); - Assert.assertNotNull(cluster); + Assert.assertNotNull(clusterClient); - maxParallelism = cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) * - cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + maxParallelism = 4; } /** @@ -160,8 +155,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { @Test @SuppressWarnings("unchecked") public void testQueryableState() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final int numKeys = 256; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -188,12 +182,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }).asQueryableState(queryName, reducingState); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); final AtomicLongArray counts = new AtomicLongArray(numKeys); @@ -257,9 +252,8 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { /** * Tests that duplicate query registrations fail the job at the JobManager. */ - @Test + @Test(timeout = 60_000) public void testDuplicateRegistrationFailsJob() throws Exception { - final Deadline deadline = TEST_TIMEOUT.fromNow(); final int numKeys = 256; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -302,54 +296,19 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { // Submit the job graph final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - final JobID jobId = jobGraph.getJobID(); - - final CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture = - notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline); - final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture = - notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline); - - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(false); + boolean caughtException = false; try { - final TestingJobManagerMessages.JobStatusIs jobStatus = - failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - assertEquals(JobStatus.FAILED, jobStatus.state()); - } catch (Exception e) { - - // if the assertion fails, it means that the job was (falsely) not cancelled. - // in this case, and given that the mini-cluster is shared with other tests, - // we cancel the job and wait for the cancellation so that the resources are freed. - - if (jobId != null) { - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); - - cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } - - // and we re-throw the exception. - throw e; + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); + } catch (ProgramInvocationException e) { + String failureCause = ExceptionUtils.stringifyException(e); + assertThat(failureCause, containsString("KvState with name '" + queryName + "' has already been registered by another operator")); + caughtException = true; } - // Get the job and check the cause - JobManagerMessages.JobFound jobFound = FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class))) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - String failureCause = jobFound.executionGraph().getFailureInfo().getExceptionAsString(); - - assertEquals(JobStatus.FAILED, jobFound.executionGraph().getState()); - assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException")); - int causedByIndex = failureCause.indexOf("Caused by: "); - String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length()); - assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException")); - assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName)); + assertTrue(caughtException); } /** @@ -360,8 +319,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ @Test public void testValueState() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -386,12 +344,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }).asQueryableState("hakuna", valueState); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements); } @@ -404,8 +363,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { @Test @Ignore public void testWrongJobIdAndWrongQueryableStateName() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -425,18 +383,22 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }).asQueryableState("hakuna", valueState); - try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(deadline, clusterClient, env)) { - // register to be notified when the job is running. - CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture = - notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline); + clusterClient.setDetached(true); + clusterClient.submitJob( + closableJobGraph.getJobGraph(), AbstractQueryableStateTestBase.class.getClassLoader()); - cluster.submitJobDetached(closableJobGraph.getJobGraph()); + CompletableFuture<JobStatus> jobStatusFuture = + clusterClient.getJobStatus(closableJobGraph.getJobId()); + + while (deadline.hasTimeLeft() && !jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).equals(JobStatus.RUNNING)) { + Thread.sleep(50); + jobStatusFuture = + clusterClient.getJobStatus(closableJobGraph.getJobId()); + } - // expect for the job to be running - TestingJobManagerMessages.JobStatusIs jobStatus = - runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertEquals(JobStatus.RUNNING, jobStatus.state()); + assertEquals(JobStatus.RUNNING, jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); final JobID wrongJobId = new JobID(); @@ -484,14 +446,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ @Test public void testQueryNonStartedJobState() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we + // Very important, because clusterClient is shared between tests and we // don't explicitly check that all slots are available before // submitting. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); @@ -512,7 +473,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }).asQueryableState("hakuna", valueState); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); @@ -527,7 +488,8 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { BasicTypeInfo.INT_TYPE_INFO, valueState); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected); } @@ -543,8 +505,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ @Test(expected = UnknownKeyOrNamespaceException.class) public void testValueStateDefault() throws Throwable { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -571,12 +532,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }).asQueryableState("hakuna", valueState); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); // Now query int key = 0; @@ -611,8 +573,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ @Test public void testValueStateShortcut() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -639,12 +600,14 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc = (ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor(); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); + executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements); } } @@ -658,8 +621,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ @Test public void testFoldingState() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final int numElements = 1024; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -684,12 +646,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }).asQueryableState("pumba", foldingState); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); final String expected = Integer.toString(numElements * (numElements + 1) / 2); @@ -731,8 +694,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ @Test public void testReducingState() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -757,12 +719,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }).asQueryableState("jungle", reducingState); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); final long expected = numElements * (numElements + 1L) / 2L; @@ -804,8 +767,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ @Test public void testMapState() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -850,12 +812,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); final long expected = numElements * (numElements + 1L) / 2L; @@ -897,8 +860,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ @Test public void testListState() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -939,12 +901,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { } }); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); final Map<Integer, Set<Long>> results = new HashMap<>(); @@ -994,8 +957,7 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { @Test public void testAggregatingState() throws Exception { - - final Deadline deadline = TEST_TIMEOUT.fromNow(); + final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT); final long numElements = 1024L; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -1025,12 +987,13 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { new AggregatingTestOperator(aggrStateDescriptor) ); - try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(cluster, env, deadline)) { + try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, env)) { final JobID jobId = autoCancellableJob.getJobId(); final JobGraph jobGraph = autoCancellableJob.getJobGraph(); - cluster.submitJobDetached(jobGraph); + clusterClient.setDetached(true); + clusterClient.submitJob(jobGraph, AbstractQueryableStateTestBase.class.getClassLoader()); for (int key = 0; key < maxParallelism; key++) { boolean success = false; @@ -1277,22 +1240,22 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { */ private static class AutoCancellableJob implements AutoCloseable { - private final FlinkMiniCluster cluster; - private final Deadline deadline; + private final ClusterClient<?> clusterClient; private final JobGraph jobGraph; private final JobID jobId; - private final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture; - AutoCancellableJob(final FlinkMiniCluster cluster, final StreamExecutionEnvironment env, final Deadline deadline) { + private final Deadline deadline; + + AutoCancellableJob(Deadline deadline, final ClusterClient<?> clusterClient, final StreamExecutionEnvironment env) { Preconditions.checkNotNull(env); - this.cluster = Preconditions.checkNotNull(cluster); + this.clusterClient = Preconditions.checkNotNull(clusterClient); this.jobGraph = env.getStreamGraph().getJobGraph(); - this.deadline = Preconditions.checkNotNull(deadline); - this.jobId = jobGraph.getJobID(); - this.cancellationFuture = notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline); + this.jobId = Preconditions.checkNotNull(jobGraph.getJobID()); + + this.deadline = deadline; } JobGraph getJobGraph() { @@ -1306,25 +1269,20 @@ public abstract class AbstractQueryableStateTestBase extends TestLogger { @Override public void close() throws Exception { // Free cluster resources - if (jobId != null) { - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); - - cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - } + clusterClient.cancel(jobId); + // cancel() is non-blocking so do this to make sure the job finished + CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( + () -> clusterClient.getJobStatus(jobId), + Time.milliseconds(50), + deadline, + (jobStatus) -> jobStatus.equals(JobStatus.CANCELED), + TestingUtils.defaultScheduledExecutor()); + assertEquals( + JobStatus.CANCELED, + jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)); } } - private static CompletableFuture<TestingJobManagerMessages.JobStatusIs> notifyWhenJobStatusIs( - final JobID jobId, final JobStatus status, final Deadline deadline) { - - return FutureUtils.toJava( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, status), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class))); - } - private static <K, S extends State, V> CompletableFuture<S> getKvState( final Deadline deadline, final QueryableStateClient client, http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java deleted file mode 100644 index 8767b52..0000000 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.queryablestate.client.QueryableStateClient; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.testingUtils.TestingCluster; - -import org.apache.curator.test.TestingServer; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.rules.TemporaryFolder; - -import java.io.IOException; - -import static org.junit.Assert.fail; - -/** - * Base class with the cluster configuration for the tests on the NON-HA mode. - */ -public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryableStateTestBase { - - private static final int NUM_JMS = 2; - private static final int NUM_TMS = 2; - private static final int NUM_SLOTS_PER_TM = 4; - - private static TestingServer zkServer; - private static TemporaryFolder temporaryFolder; - - public static void setup(int proxyPortRangeStart, int serverPortRangeStart) { - try { - zkServer = new TestingServer(); - temporaryFolder = new TemporaryFolder(); - temporaryFolder.create(); - - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); - config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); - config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); - config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); - config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS)); - config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS)); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); - config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); - config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - - cluster = new TestingCluster(config, false); - cluster.start(true); - - client = new QueryableStateClient("localhost", proxyPortRangeStart); - - // verify that we are in HA mode - Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @AfterClass - public static void tearDown() throws IOException { - client.shutdownAndWait(); - - cluster.stop(); - cluster.awaitTermination(); - - zkServer.stop(); - zkServer.close(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java index 6f31e76..a47045f 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java @@ -18,28 +18,102 @@ package org.apache.flink.queryablestate.itcases; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; /** * Several integration tests for queryable state using the {@link FsStateBackend}. */ -public class HAQueryableStateFsBackendITCase extends HAAbstractQueryableStateTestBase { +public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private static final int NUM_JMS = 2; + // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that + // we always use all TaskManagers so that the JM oracle is always properly re-registered + private static final int NUM_TMS = 2; + private static final int NUM_SLOTS_PER_TM = 2; - @BeforeClass - public static void setup() { - setup(9064, 9069); - } + private static final int QS_PROXY_PORT_RANGE_START = 9064; + private static final int QS_SERVER_PORT_RANGE_START = 9069; + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; @Override protected AbstractStateBackend createStateBackend() throws Exception { return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); } + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + // we have to manage this manually because we have to create the ZooKeeper server + // ahead of this + miniClusterResource = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfig(), + NUM_TMS, + NUM_SLOTS_PER_TM), + true); + + miniClusterResource.before(); + + client = new QueryableStateClient("localhost", QS_PROXY_PORT_RANGE_START); + + clusterClient = miniClusterResource.getClusterClient(); + } + + @AfterClass + public static void tearDown() throws Exception { + miniClusterResource.after(); + + client.shutdownAndWait(); + + zkServer.stop(); + zkServer.close(); + } + + private static Configuration getConfig() throws Exception { + + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); + config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); + config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); + config.setString( + QueryableStateOptions.PROXY_PORT_RANGE, + QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS)); + config.setString( + QueryableStateOptions.SERVER_PORT_RANGE, + QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS)); + config.setBoolean(WebOptions.SUBMIT_ENABLE, false); + + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); + + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + return config; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java index cae02e2..b1092c1 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java @@ -18,28 +18,103 @@ package org.apache.flink.queryablestate.itcases; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Rule; +import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; /** * Several integration tests for queryable state using the {@link RocksDBStateBackend}. */ -public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase { +public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBase { - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private static final int NUM_JMS = 2; + // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that + // we always use all TaskManagers so that the JM oracle is always properly re-registered + private static final int NUM_TMS = 2; + private static final int NUM_SLOTS_PER_TM = 2; - @BeforeClass - public static void setup() { - setup(9074, 9079); - } + private static final int QS_PROXY_PORT_RANGE_START = 9074; + private static final int QS_SERVER_PORT_RANGE_START = 9079; + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static TestingServer zkServer; + + private static MiniClusterResource miniClusterResource; @Override protected AbstractStateBackend createStateBackend() throws Exception { return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); } + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + // we have to manage this manually because we have to create the ZooKeeper server + // ahead of this + miniClusterResource = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfig(), + NUM_TMS, + NUM_SLOTS_PER_TM), + true); + + miniClusterResource.before(); + + client = new QueryableStateClient("localhost", QS_PROXY_PORT_RANGE_START); + + clusterClient = miniClusterResource.getClusterClient(); + } + + @AfterClass + public static void tearDown() throws Exception { + miniClusterResource.after(); + + client.shutdownAndWait(); + + zkServer.stop(); + zkServer.close(); + } + + private static Configuration getConfig() throws Exception { + + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); + config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); + config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); + config.setString( + QueryableStateOptions.PROXY_PORT_RANGE, + QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS)); + config.setString( + QueryableStateOptions.SERVER_PORT_RANGE, + QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS)); + config.setBoolean(WebOptions.SUBMIT_ENABLE, false); + + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); + + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + return config; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java deleted file mode 100644 index 2686a29..0000000 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.itcases; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.queryablestate.client.QueryableStateClient; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.testingUtils.TestingCluster; - -import org.junit.AfterClass; -import org.junit.Assert; - -import static org.junit.Assert.fail; - -/** - * Base class with the cluster configuration for the tests on the HA mode. - */ -public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQueryableStateTestBase { - - private static final int NUM_TMS = 2; - private static final int NUM_SLOTS_PER_TM = 4; - - public static void setup(int proxyPortRangeStart, int serverPortRangeStart) { - try { - Configuration config = new Configuration(); - config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); - config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); - config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1); - config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); - config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS)); - config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS)); - - cluster = new TestingCluster(config, false); - cluster.start(true); - - client = new QueryableStateClient("localhost", proxyPortRangeStart); - - // verify that we are not in HA mode - Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @AfterClass - public static void tearDown() { - client.shutdownAndWait(); - - cluster.stop(); - cluster.awaitTermination(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java index 9457e0f..eb300c1 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java @@ -18,28 +18,78 @@ package org.apache.flink.queryablestate.itcases; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.test.util.MiniClusterResource; +import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.rules.TemporaryFolder; /** * Several integration tests for queryable state using the {@link FsStateBackend}. */ -public class NonHAQueryableStateFsBackendITCase extends NonHAAbstractQueryableStateTestBase { +public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTestBase { + + // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that + // we always use all TaskManagers so that the JM oracle is always properly re-registered + private static final int NUM_TMS = 2; + private static final int NUM_SLOTS_PER_TM = 2; + + private static final int QS_PROXY_PORT_RANGE_START = 9084; + private static final int QS_SERVER_PORT_RANGE_START = 9089; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - @BeforeClass - public static void setup() { - setup(9084, 9089); - } + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfig(), + NUM_TMS, + NUM_SLOTS_PER_TM), + true); @Override protected AbstractStateBackend createStateBackend() throws Exception { return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); } + + @BeforeClass + public static void setup() throws Exception { + client = new QueryableStateClient("localhost", QS_PROXY_PORT_RANGE_START); + + clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient(); + } + + @AfterClass + public static void tearDown() { + client.shutdownAndWait(); + } + + private static Configuration getConfig() { + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); + config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1); + config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); + config.setString( + QueryableStateOptions.PROXY_PORT_RANGE, + QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS)); + config.setString( + QueryableStateOptions.SERVER_PORT_RANGE, + QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS)); + config.setBoolean(WebOptions.SUBMIT_ENABLE, false); + return config; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java index 7778a94..3d6a3e3 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java @@ -18,28 +18,79 @@ package org.apache.flink.queryablestate.itcases; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.test.util.MiniClusterResource; +import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.rules.TemporaryFolder; /** * Several integration tests for queryable state using the {@link RocksDBStateBackend}. */ -public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase { +public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableStateTestBase { + + // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the pipelines so that + // we always use all TaskManagers so that the JM oracle is always properly re-registered + private static final int NUM_TMS = 2; + private static final int NUM_SLOTS_PER_TM = 2; + + private static final int QS_PROXY_PORT_RANGE_START = 9094; + private static final int QS_SERVER_PORT_RANGE_START = 9099; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - @BeforeClass - public static void setup() { - setup(9094, 9099); - } + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getConfig(), + NUM_TMS, + NUM_SLOTS_PER_TM), + true); @Override protected AbstractStateBackend createStateBackend() throws Exception { return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); } + + @BeforeClass + public static void setup() throws Exception { + client = new QueryableStateClient("localhost", QS_PROXY_PORT_RANGE_START); + + clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient(); + } + + @AfterClass + public static void tearDown() { + client.shutdownAndWait(); + } + + private static Configuration getConfig() { + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); + config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1); + config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); + config.setString( + QueryableStateOptions.PROXY_PORT_RANGE, + QS_PROXY_PORT_RANGE_START + "-" + (QS_PROXY_PORT_RANGE_START + NUM_TMS)); + config.setString( + QueryableStateOptions.SERVER_PORT_RANGE, + QS_SERVER_PORT_RANGE_START + "-" + (QS_SERVER_PORT_RANGE_START + NUM_TMS)); + config.setBoolean(WebOptions.SUBMIT_ENABLE, false); + return config; + } + }