[FLINK-8778] Port queryable state ITCases to use MiniClusterResource

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/735c95fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/735c95fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/735c95fb

Branch: refs/heads/release-1.5
Commit: 735c95fbb93316475dc6dd6daa2aa91a7158c843
Parents: 7ab8ac3
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Feb 26 11:55:14 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sun Mar 11 08:31:56 2018 -0700

----------------------------------------------------------------------
 .../itcases/AbstractQueryableStateTestBase.java | 240 ++++++++-----------
 .../HAAbstractQueryableStateTestBase.java       |  93 -------
 .../HAQueryableStateFsBackendITCase.java        |  90 ++++++-
 .../HAQueryableStateRocksDBBackendITCase.java   |  91 ++++++-
 .../NonHAAbstractQueryableStateTestBase.java    |  75 ------
 .../NonHAQueryableStateFsBackendITCase.java     |  60 ++++-
 ...NonHAQueryableStateRocksDBBackendITCase.java |  61 ++++-
 7 files changed, 375 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index 623e42b..e99a28b 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -37,12 +37,15 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.queryablestate.client.VoidNamespace;
@@ -53,12 +56,9 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -68,6 +68,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -76,6 +77,7 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -93,11 +95,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
 
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -106,7 +106,7 @@ import static org.junit.Assert.fail;
  */
 public abstract class AbstractQueryableStateTestBase extends TestLogger {
 
-       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10000L, TimeUnit.SECONDS);
+       private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
        public static final long RETRY_TIMEOUT = 50L;
 
        private final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(4);
@@ -118,27 +118,22 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
        protected AbstractStateBackend stateBackend;
 
        /**
-        * Shared between all the test. Make sure to have at least NUM_SLOTS
-        * available after your test finishes, e.g. cancel the job you 
submitted.
-        */
-       protected static FlinkMiniCluster cluster;
-
-       /**
         * Client shared between all the test.
         */
        protected static QueryableStateClient client;
 
+       protected static ClusterClient<?> clusterClient;
+
        protected static int maxParallelism;
 
        @Before
        public void setUp() throws Exception {
-               // NOTE: do not use a shared instance for all tests as the 
tests may brake
+               // NOTE: do not use a shared instance for all tests as the 
tests may break
                this.stateBackend = createStateBackend();
 
-               Assert.assertNotNull(cluster);
+               Assert.assertNotNull(clusterClient);
 
-               maxParallelism = 
cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
1) *
-                               
cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
1);
+               maxParallelism = 4;
        }
 
        /**
@@ -160,8 +155,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
        @Test
        @SuppressWarnings("unchecked")
        public void testQueryableState() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final int numKeys = 256;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -188,12 +182,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        }
                }).asQueryableState(queryName, reducingState);
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        final AtomicLongArray counts = new 
AtomicLongArray(numKeys);
 
@@ -257,9 +252,8 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
        /**
         * Tests that duplicate query registrations fail the job at the 
JobManager.
         */
-       @Test
+       @Test(timeout = 60_000)
        public void testDuplicateRegistrationFailsJob() throws Exception {
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
                final int numKeys = 256;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -302,54 +296,19 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
                // Submit the job graph
                final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-               final JobID jobId = jobGraph.getJobID();
-
-               final CompletableFuture<TestingJobManagerMessages.JobStatusIs> 
failedFuture =
-                               notifyWhenJobStatusIs(jobId, JobStatus.FAILED, 
deadline);
 
-               final CompletableFuture<TestingJobManagerMessages.JobStatusIs> 
cancellationFuture =
-                               notifyWhenJobStatusIs(jobId, 
JobStatus.CANCELED, deadline);
-
-               cluster.submitJobDetached(jobGraph);
+               clusterClient.setDetached(false);
 
+               boolean caughtException = false;
                try {
-                       final TestingJobManagerMessages.JobStatusIs jobStatus =
-                                       
failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-                       assertEquals(JobStatus.FAILED, jobStatus.state());
-               } catch (Exception e) {
-
-                       // if the assertion fails, it means that the job was 
(falsely) not cancelled.
-                       // in this case, and given that the mini-cluster is 
shared with other tests,
-                       // we cancel the job and wait for the cancellation so 
that the resources are freed.
-
-                       if (jobId != null) {
-                               cluster.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-                               
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                       }
-
-                       // and we re-throw the exception.
-                       throw e;
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
+               } catch (ProgramInvocationException e) {
+                       String failureCause = 
ExceptionUtils.stringifyException(e);
+                       assertThat(failureCause, containsString("KvState with 
name '" + queryName + "' has already been registered by another operator"));
+                       caughtException = true;
                }
 
-               // Get the job and check the cause
-               JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
-                               cluster.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
-                               .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
-
-               String failureCause = 
jobFound.executionGraph().getFailureInfo().getExceptionAsString();
-
-               assertEquals(JobStatus.FAILED, 
jobFound.executionGraph().getState());
-               assertTrue("Not instance of SuppressRestartsException", 
failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
-               int causedByIndex = failureCause.indexOf("Caused by: ");
-               String subFailureCause = failureCause.substring(causedByIndex + 
"Caused by: ".length());
-               assertTrue("Not caused by IllegalStateException", 
subFailureCause.startsWith("java.lang.IllegalStateException"));
-               assertTrue("Exception does not contain registration name", 
subFailureCause.contains(queryName));
+               assertTrue(caughtException);
        }
 
        /**
@@ -360,8 +319,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        @Test
        public void testValueState() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -386,12 +344,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        }
                }).asQueryableState("hakuna", valueState);
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        executeValueQuery(deadline, client, jobId, "hakuna", 
valueState, numElements);
                }
@@ -404,8 +363,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
        @Test
        @Ignore
        public void testWrongJobIdAndWrongQueryableStateName() throws Exception 
{
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -425,18 +383,22 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        }
                }).asQueryableState("hakuna", valueState);
 
-               try (AutoCancellableJob closableJobGraph = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob closableJobGraph = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
-                       // register to be notified when the job is running.
-                       
CompletableFuture<TestingJobManagerMessages.JobStatusIs> runningFuture =
-                                       
notifyWhenJobStatusIs(closableJobGraph.getJobId(), JobStatus.RUNNING, deadline);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(
+                               closableJobGraph.getJobGraph(), 
AbstractQueryableStateTestBase.class.getClassLoader());
 
-                       
cluster.submitJobDetached(closableJobGraph.getJobGraph());
+                       CompletableFuture<JobStatus> jobStatusFuture =
+                               
clusterClient.getJobStatus(closableJobGraph.getJobId());
+
+                       while (deadline.hasTimeLeft() && 
!jobStatusFuture.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS).equals(JobStatus.RUNNING)) {
+                               Thread.sleep(50);
+                               jobStatusFuture =
+                                       
clusterClient.getJobStatus(closableJobGraph.getJobId());
+                       }
 
-                       // expect for the job to be running
-                       TestingJobManagerMessages.JobStatusIs jobStatus =
-                                       
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                       assertEquals(JobStatus.RUNNING, jobStatus.state());
+                       assertEquals(JobStatus.RUNNING, 
jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
                        final JobID wrongJobId = new JobID();
 
@@ -484,14 +446,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        @Test
        public void testQueryNonStartedJobState() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStateBackend(stateBackend);
                env.setParallelism(maxParallelism);
-               // Very important, because cluster is shared between tests and 
we
+               // Very important, because clusterClient is shared between 
tests and we
                // don't explicitly check that all slots are available before
                // submitting.
                
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000L));
@@ -512,7 +473,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                        }
                                }).asQueryableState("hakuna", valueState);
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
@@ -527,7 +488,8 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                        BasicTypeInfo.INT_TYPE_INFO,
                                        valueState);
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        executeValueQuery(deadline, client, jobId, "hakuna", 
valueState, expected);
                }
@@ -543,8 +505,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        @Test(expected = UnknownKeyOrNamespaceException.class)
        public void testValueStateDefault() throws Throwable {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -571,12 +532,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                        }
                                }).asQueryableState("hakuna", valueState);
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        // Now query
                        int key = 0;
@@ -611,8 +573,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        @Test
        public void testValueStateShortcut() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -639,12 +600,14 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc =
                                (ValueStateDescriptor<Tuple2<Integer, Long>>) 
queryableState.getStateDescriptor();
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
+
                        executeValueQuery(deadline, client, jobId, "matata", 
stateDesc, numElements);
                }
        }
@@ -658,8 +621,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        @Test
        public void testFoldingState() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final int numElements = 1024;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -684,12 +646,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        }
                }).asQueryableState("pumba", foldingState);
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        final String expected = Integer.toString(numElements * 
(numElements + 1) / 2);
 
@@ -731,8 +694,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        @Test
        public void testReducingState() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -757,12 +719,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        }
                }).asQueryableState("jungle", reducingState);
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        final long expected = numElements * (numElements + 1L) 
/ 2L;
 
@@ -804,8 +767,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        @Test
        public void testMapState() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -850,12 +812,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        }
                });
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        final long expected = numElements * (numElements + 1L) 
/ 2L;
 
@@ -897,8 +860,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        @Test
        public void testListState() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -939,12 +901,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                        }
                });
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        final Map<Integer, Set<Long>> results = new HashMap<>();
 
@@ -994,8 +957,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
        @Test
        public void testAggregatingState() throws Exception {
-
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
+               final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
                final long numElements = 1024L;
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -1025,12 +987,13 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                new AggregatingTestOperator(aggrStateDescriptor)
                );
 
-               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(cluster, env, deadline)) {
+               try (AutoCancellableJob autoCancellableJob = new 
AutoCancellableJob(deadline, clusterClient, env)) {
 
                        final JobID jobId = autoCancellableJob.getJobId();
                        final JobGraph jobGraph = 
autoCancellableJob.getJobGraph();
 
-                       cluster.submitJobDetached(jobGraph);
+                       clusterClient.setDetached(true);
+                       clusterClient.submitJob(jobGraph, 
AbstractQueryableStateTestBase.class.getClassLoader());
 
                        for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
@@ -1277,22 +1240,22 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
         */
        private static class AutoCancellableJob implements AutoCloseable {
 
-               private final FlinkMiniCluster cluster;
-               private final Deadline deadline;
+               private final ClusterClient<?> clusterClient;
                private final JobGraph jobGraph;
 
                private final JobID jobId;
-               private final 
CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture;
 
-               AutoCancellableJob(final FlinkMiniCluster cluster, final 
StreamExecutionEnvironment env, final Deadline deadline) {
+               private final Deadline deadline;
+
+               AutoCancellableJob(Deadline deadline, final ClusterClient<?> 
clusterClient, final StreamExecutionEnvironment env) {
                        Preconditions.checkNotNull(env);
 
-                       this.cluster = Preconditions.checkNotNull(cluster);
+                       this.clusterClient = 
Preconditions.checkNotNull(clusterClient);
                        this.jobGraph = env.getStreamGraph().getJobGraph();
-                       this.deadline = Preconditions.checkNotNull(deadline);
 
-                       this.jobId = jobGraph.getJobID();
-                       this.cancellationFuture = notifyWhenJobStatusIs(jobId, 
JobStatus.CANCELED, deadline);
+                       this.jobId = 
Preconditions.checkNotNull(jobGraph.getJobID());
+
+                       this.deadline = deadline;
                }
 
                JobGraph getJobGraph() {
@@ -1306,25 +1269,20 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                @Override
                public void close() throws Exception {
                        // Free cluster resources
-                       if (jobId != null) {
-                               cluster.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-                               
cancellationFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-                       }
+                       clusterClient.cancel(jobId);
+                       // cancel() is non-blocking so do this to make sure the 
job finished
+                       CompletableFuture<JobStatus> jobStatusFuture = 
FutureUtils.retrySuccesfulWithDelay(
+                               () -> clusterClient.getJobStatus(jobId),
+                               Time.milliseconds(50),
+                               deadline,
+                               (jobStatus) -> 
jobStatus.equals(JobStatus.CANCELED),
+                               TestingUtils.defaultScheduledExecutor());
+                       assertEquals(
+                               JobStatus.CANCELED,
+                               
jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
                }
        }
 
-       private static CompletableFuture<TestingJobManagerMessages.JobStatusIs> 
notifyWhenJobStatusIs(
-                       final JobID jobId, final JobStatus status, final 
Deadline deadline) {
-
-               return FutureUtils.toJava(
-                               cluster.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, status), 
deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
-       }
-
        private static <K, S extends State, V> CompletableFuture<S> getKvState(
                        final Deadline deadline,
                        final QueryableStateClient client,

http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
deleted file mode 100644
index 8767b52..0000000
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.queryablestate.client.QueryableStateClient;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.apache.curator.test.TestingServer;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.IOException;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the NON-HA mode.
- */
-public abstract class HAAbstractQueryableStateTestBase extends 
AbstractQueryableStateTestBase {
-
-       private static final int NUM_JMS = 2;
-       private static final int NUM_TMS = 2;
-       private static final int NUM_SLOTS_PER_TM = 4;
-
-       private static TestingServer zkServer;
-       private static TemporaryFolder temporaryFolder;
-
-       public static void setup(int proxyPortRangeStart, int 
serverPortRangeStart) {
-               try {
-                       zkServer = new TestingServer();
-                       temporaryFolder = new TemporaryFolder();
-                       temporaryFolder.create();
-
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
-                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
-                       
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
-                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
-                       
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + 
"-" + (proxyPortRangeStart + NUM_TMS));
-                       
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart 
+ "-" + (serverPortRangeStart + NUM_TMS));
-                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
-                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
-                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
-
-                       cluster = new TestingCluster(config, false);
-                       cluster.start(true);
-
-                       client = new QueryableStateClient("localhost", 
proxyPortRangeStart);
-
-                       // verify that we are in HA mode
-                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.ZOOKEEPER);
-
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void tearDown() throws IOException {
-               client.shutdownAndWait();
-
-               cluster.stop();
-               cluster.awaitTermination();
-
-               zkServer.stop();
-               zkServer.close();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 6f31e76..a47045f 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -18,28 +18,102 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.test.util.MiniClusterResource;
 
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link 
FsStateBackend}.
  */
-public class HAQueryableStateFsBackendITCase extends 
HAAbstractQueryableStateTestBase {
+public class HAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestBase {
 
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+       private static final int NUM_JMS = 2;
+       // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the 
pipelines so that
+       // we always use all TaskManagers so that the JM oracle is always 
properly re-registered
+       private static final int NUM_TMS = 2;
+       private static final int NUM_SLOTS_PER_TM = 2;
 
-       @BeforeClass
-       public static void setup() {
-               setup(9064, 9069);
-       }
+       private static final int QS_PROXY_PORT_RANGE_START = 9064;
+       private static final int QS_SERVER_PORT_RANGE_START = 9069;
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private static TestingServer zkServer;
+
+       private static MiniClusterResource miniClusterResource;
 
        @Override
        protected AbstractStateBackend createStateBackend() throws Exception {
                return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
        }
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               zkServer = new TestingServer();
+
+               // we have to manage this manually because we have to create 
the ZooKeeper server
+               // ahead of this
+               miniClusterResource = new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               getConfig(),
+                               NUM_TMS,
+                               NUM_SLOTS_PER_TM),
+                       true);
+
+               miniClusterResource.before();
+
+               client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
+
+               clusterClient = miniClusterResource.getClusterClient();
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               miniClusterResource.after();
+
+               client.shutdownAndWait();
+
+               zkServer.stop();
+               zkServer.close();
+       }
+
+       private static Configuration getConfig() throws Exception {
+
+               Configuration config = new Configuration();
+               config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+               config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 
2);
+               config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 
2);
+               config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 
2);
+               config.setString(
+                       QueryableStateOptions.PROXY_PORT_RANGE,
+                       QS_PROXY_PORT_RANGE_START + "-" + 
(QS_PROXY_PORT_RANGE_START + NUM_TMS));
+               config.setString(
+                       QueryableStateOptions.SERVER_PORT_RANGE,
+                       QS_SERVER_PORT_RANGE_START + "-" + 
(QS_SERVER_PORT_RANGE_START + NUM_TMS));
+               config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
+
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
+
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+               config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+               return config;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index cae02e2..b1092c1 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -18,28 +18,103 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.test.util.MiniClusterResource;
 
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
  */
-public class HAQueryableStateRocksDBBackendITCase extends 
HAAbstractQueryableStateTestBase {
+public class HAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableStateTestBase {
 
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+       private static final int NUM_JMS = 2;
+       // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the 
pipelines so that
+       // we always use all TaskManagers so that the JM oracle is always 
properly re-registered
+       private static final int NUM_TMS = 2;
+       private static final int NUM_SLOTS_PER_TM = 2;
 
-       @BeforeClass
-       public static void setup() {
-               setup(9074, 9079);
-       }
+       private static final int QS_PROXY_PORT_RANGE_START = 9074;
+       private static final int QS_SERVER_PORT_RANGE_START = 9079;
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private static TestingServer zkServer;
+
+       private static MiniClusterResource miniClusterResource;
 
        @Override
        protected AbstractStateBackend createStateBackend() throws Exception {
                return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
        }
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               zkServer = new TestingServer();
+
+               // we have to manage this manually because we have to create 
the ZooKeeper server
+               // ahead of this
+               miniClusterResource = new MiniClusterResource(
+                       new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                               getConfig(),
+                               NUM_TMS,
+                               NUM_SLOTS_PER_TM),
+                       true);
+
+               miniClusterResource.before();
+
+               client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
+
+               clusterClient = miniClusterResource.getClusterClient();
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               miniClusterResource.after();
+
+               client.shutdownAndWait();
+
+               zkServer.stop();
+               zkServer.close();
+       }
+
+       private static Configuration getConfig() throws Exception {
+
+               Configuration config = new Configuration();
+               config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+               config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 
2);
+               config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 
2);
+               config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 
2);
+               config.setString(
+                       QueryableStateOptions.PROXY_PORT_RANGE,
+                       QS_PROXY_PORT_RANGE_START + "-" + 
(QS_PROXY_PORT_RANGE_START + NUM_TMS));
+               config.setString(
+                       QueryableStateOptions.SERVER_PORT_RANGE,
+                       QS_SERVER_PORT_RANGE_START + "-" + 
(QS_SERVER_PORT_RANGE_START + NUM_TMS));
+               config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
+
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
+
+               config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+               config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+               return config;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
deleted file mode 100644
index 2686a29..0000000
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.queryablestate.client.QueryableStateClient;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the HA mode.
- */
-public abstract class NonHAAbstractQueryableStateTestBase extends 
AbstractQueryableStateTestBase {
-
-       private static final int NUM_TMS = 2;
-       private static final int NUM_SLOTS_PER_TM = 4;
-
-       public static void setup(int proxyPortRangeStart, int 
serverPortRangeStart) {
-               try {
-                       Configuration config = new Configuration();
-                       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
4L);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
-                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-                       
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
-                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
-                       
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + 
"-" + (proxyPortRangeStart + NUM_TMS));
-                       
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart 
+ "-" + (serverPortRangeStart + NUM_TMS));
-
-                       cluster = new TestingCluster(config, false);
-                       cluster.start(true);
-
-                       client = new QueryableStateClient("localhost", 
proxyPortRangeStart);
-
-                       // verify that we are not in HA mode
-                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.NONE);
-
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void tearDown() {
-               client.shutdownAndWait();
-
-               cluster.stop();
-               cluster.awaitTermination();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index 9457e0f..eb300c1 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -18,28 +18,78 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.test.util.MiniClusterResource;
 
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link 
FsStateBackend}.
  */
-public class NonHAQueryableStateFsBackendITCase extends 
NonHAAbstractQueryableStateTestBase {
+public class NonHAQueryableStateFsBackendITCase extends 
AbstractQueryableStateTestBase {
+
+       // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the 
pipelines so that
+       // we always use all TaskManagers so that the JM oracle is always 
properly re-registered
+       private static final int NUM_TMS = 2;
+       private static final int NUM_SLOTS_PER_TM = 2;
+
+       private static final int QS_PROXY_PORT_RANGE_START = 9084;
+       private static final int QS_SERVER_PORT_RANGE_START = 9089;
 
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-       @BeforeClass
-       public static void setup() {
-               setup(9084, 9089);
-       }
+       @ClassRule
+       public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       getConfig(),
+                       NUM_TMS,
+                       NUM_SLOTS_PER_TM),
+               true);
 
        @Override
        protected AbstractStateBackend createStateBackend() throws Exception {
                return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
        }
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
+
+               clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               client.shutdownAndWait();
+       }
+
+       private static Configuration getConfig() {
+               Configuration config = new Configuration();
+               config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+               config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 
1);
+               config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 
1);
+               config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 
1);
+               config.setString(
+                       QueryableStateOptions.PROXY_PORT_RANGE,
+                       QS_PROXY_PORT_RANGE_START + "-" + 
(QS_PROXY_PORT_RANGE_START + NUM_TMS));
+               config.setString(
+                       QueryableStateOptions.SERVER_PORT_RANGE,
+                       QS_SERVER_PORT_RANGE_START + "-" + 
(QS_SERVER_PORT_RANGE_START + NUM_TMS));
+               config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
+               return config;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/735c95fb/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 7778a94..3d6a3e3 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -18,28 +18,79 @@
 
 package org.apache.flink.queryablestate.itcases;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.test.util.MiniClusterResource;
 
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
  */
-public class NonHAQueryableStateRocksDBBackendITCase extends 
NonHAAbstractQueryableStateTestBase {
+public class NonHAQueryableStateRocksDBBackendITCase extends 
AbstractQueryableStateTestBase {
+
+       // NUM_TMS * NUM_SLOTS_PER_TM must match the parallelism of the 
pipelines so that
+       // we always use all TaskManagers so that the JM oracle is always 
properly re-registered
+       private static final int NUM_TMS = 2;
+       private static final int NUM_SLOTS_PER_TM = 2;
+
+       private static final int QS_PROXY_PORT_RANGE_START = 9094;
+       private static final int QS_SERVER_PORT_RANGE_START = 9099;
 
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-       @BeforeClass
-       public static void setup() {
-               setup(9094, 9099);
-       }
+       @ClassRule
+       public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
+               new MiniClusterResource.MiniClusterResourceConfiguration(
+                       getConfig(),
+                       NUM_TMS,
+                       NUM_SLOTS_PER_TM),
+               true);
 
        @Override
        protected AbstractStateBackend createStateBackend() throws Exception {
                return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
        }
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               client = new QueryableStateClient("localhost", 
QS_PROXY_PORT_RANGE_START);
+
+               clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               client.shutdownAndWait();
+       }
+
+       private static Configuration getConfig() {
+               Configuration config = new Configuration();
+               config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+               config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 
1);
+               config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 
1);
+               config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 
1);
+               config.setString(
+                       QueryableStateOptions.PROXY_PORT_RANGE,
+                       QS_PROXY_PORT_RANGE_START + "-" + 
(QS_PROXY_PORT_RANGE_START + NUM_TMS));
+               config.setString(
+                       QueryableStateOptions.SERVER_PORT_RANGE,
+                       QS_SERVER_PORT_RANGE_START + "-" + 
(QS_SERVER_PORT_RANGE_START + NUM_TMS));
+               config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
+               return config;
+       }
+
 }

Reply via email to