Repository: flink
Updated Branches:
  refs/heads/master c685251ce -> e3b27edfc


[FLINK-7379] [qs] Remove HighAvailabilityServices from QS client constructor.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3b27edf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3b27edf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3b27edf

Branch: refs/heads/master
Commit: e3b27edfcc22c1157fa16a8f290636fc48bfe142
Parents: c685251
Author: kkloudas <kklou...@gmail.com>
Authored: Mon Jul 31 13:35:24 2017 +0200
Committer: kkloudas <kklou...@gmail.com>
Committed: Fri Aug 11 10:58:16 2017 +0200

----------------------------------------------------------------------
 .../runtime/query/QueryableStateClient.java     |  20 +++
 .../query/AbstractQueryableStateITCase.java     | 148 +++++++------------
 .../query/HAAbstractQueryableStateITCase.java   | 102 +++++++++++++
 .../query/HAQueryableStateITCaseFsBackend.java  |  39 +++++
 .../HAQueryableStateITCaseRocksDBBackend.java   |  39 +++++
 .../NonHAAbstractQueryableStateITCase.java      |  81 ++++++++++
 .../NonHAQueryableStateITCaseFsBackend.java     |  39 +++++
 ...NonHAQueryableStateITCaseRocksDBBackend.java |  39 +++++
 .../query/QueryableStateITCaseFsBackend.java    |  39 -----
 .../QueryableStateITCaseMemoryBackend.java      |  34 -----
 .../QueryableStateITCaseRocksDBBackend.java     |  39 -----
 11 files changed, 411 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 1b1c8f8..4ba6929 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -31,7 +31,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory;
 import 
org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
@@ -116,9 +118,27 @@ public class QueryableStateClient {
         * system and another for the network client.
         *
         * @param config Configuration to use.
+        * @throws Exception Failures are forwarded
+        */
+       public QueryableStateClient(Configuration config) throws Exception {
+               this(config, 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
+                               config, Executors.directExecutor(), 
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
+       }
+
+       /**
+        * Creates a client from the given configuration.
+        *
+        * <p>This will create multiple Thread pools: one for the started actor
+        * system and another for the network client.
+        *
+        * @param config Configuration to use.
         * @param highAvailabilityServices Service factory for high 
availability services
         * @throws Exception Failures are forwarded
+        *
+        * @deprecated This constructor is deprecated and stays only for 
backwards compatibility. Use the
+        * {@link #QueryableStateClient(Configuration)} instead.
         */
+       @Deprecated
        public QueryableStateClient(
                        Configuration config,
                        HighAvailabilityServices highAvailabilityServices) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 21e3b4c..8ac3d2f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -34,21 +34,18 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import org.apache.flink.runtime.messages.JobManagerMessages.JobFound;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.apache.flink.runtime.query.QueryableStateClient;
 import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -61,9 +58,8 @@ import akka.dispatch.Futures;
 import akka.dispatch.OnSuccess;
 import akka.dispatch.Recover;
 import akka.pattern.Patterns;
-import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -80,25 +76,18 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Base class for queryable state integration tests with a configurable state 
backend.
  */
 public abstract class AbstractQueryableStateITCase extends TestLogger {
 
-       private static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(100, TimeUnit.SECONDS);
+       protected static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10000, TimeUnit.SECONDS);
        private static final FiniteDuration QUERY_RETRY_DELAY = new 
FiniteDuration(100, TimeUnit.MILLISECONDS);
 
-       private static ActorSystem testActorSystem;
-
-       private static final int NUM_TMS = 2;
-       private static final int NUM_SLOTS_PER_TM = 4;
-       private static final int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM;
+       protected static ActorSystem testActorSystem;
 
        /**
         * State backend to use.
@@ -109,47 +98,19 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
         * Shared between all the test. Make sure to have at least NUM_SLOTS
         * available after your test finishes, e.g. cancel the job you 
submitted.
         */
-       private static TestingCluster cluster;
-
-       @BeforeClass
-       public static void setup() {
-               try {
-                       Configuration config = new Configuration();
-                       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
4L);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
-                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
-                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
-
-                       cluster = new TestingCluster(config, false);
-                       cluster.start(true);
-
-                       testActorSystem = AkkaUtils.createDefaultActorSystem();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
+       protected static FlinkMiniCluster cluster;
 
-       @AfterClass
-       public static void tearDown() {
-               try {
-                       cluster.shutdown();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-
-               if (testActorSystem != null) {
-                       testActorSystem.shutdown();
-               }
-       }
+       protected static int maxParallelism;
 
        @Before
        public void setUp() throws Exception {
                // NOTE: do not use a shared instance for all tests as the 
tests may brake
                this.stateBackend = createStateBackend();
+
+               Assert.assertNotNull(cluster);
+
+               maxParallelism = 
cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
1) *
+                               
cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
1);
        }
 
        /**
@@ -175,9 +136,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                final Deadline deadline = TEST_TIMEOUT.fromNow();
                final int numKeys = 256;
 
-               final QueryableStateClient client = new QueryableStateClient(
-                       cluster.configuration(),
-                       cluster.highAvailabilityServices());
+               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
 
                JobID jobId = null;
 
@@ -187,7 +146,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        //
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setStateBackend(stateBackend);
-                       env.setParallelism(NUM_SLOTS);
+                       env.setParallelism(maxParallelism);
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
@@ -293,10 +252,13 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
        /**
         * Tests that duplicate query registrations fail the job at the 
JobManager.
+        *
+        * <b>NOTE: </b> This test is only in the non-HA variant of the tests 
because
+        * in the HA mode we use the actual JM code which does not recognize the
+        * {@code NotifyWhenJobStatus} message.  *
         */
        @Test
        public void testDuplicateRegistrationFailsJob() throws Exception {
-               // Config
                final Deadline deadline = TEST_TIMEOUT.fromNow();
                final int numKeys = 256;
 
@@ -308,7 +270,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        //
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setStateBackend(stateBackend);
-                       env.setParallelism(NUM_SLOTS);
+                       env.setParallelism(maxParallelism);
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
@@ -349,21 +311,21 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
                        jobId = jobGraph.getJobID();
 
-                       Future<JobStatusIs> failedFuture = cluster
+                       Future<TestingJobManagerMessages.JobStatusIs> 
failedFuture = cluster
                                        .getLeaderGateway(deadline.timeLeft())
-                                       .ask(new NotifyWhenJobStatus(jobId, 
JobStatus.FAILED), deadline.timeLeft())
-                                       
.mapTo(ClassTag$.MODULE$.<JobStatusIs>apply(JobStatusIs.class));
+                                       .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), 
deadline.timeLeft())
+                                       
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class));
 
                        cluster.submitJobDetached(jobGraph);
 
-                       JobStatusIs jobStatus = Await.result(failedFuture, 
deadline.timeLeft());
+                       TestingJobManagerMessages.JobStatusIs jobStatus = 
Await.result(failedFuture, deadline.timeLeft());
                        assertEquals(JobStatus.FAILED, jobStatus.state());
 
                        // Get the job and check the cause
-                       JobFound jobFound = Await.result(
+                       JobManagerMessages.JobFound jobFound = Await.result(
                                        
cluster.getLeaderGateway(deadline.timeLeft())
                                                        .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-                                                       
.mapTo(ClassTag$.MODULE$.<JobFound>apply(JobFound.class)),
+                                                       
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)),
                                        deadline.timeLeft());
 
                        String failureCause = 
jobFound.executionGraph().getFailureCause().getExceptionAsString();
@@ -376,10 +338,10 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                } finally {
                        // Free cluster resources
                        if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
+                               Future<JobManagerMessages.CancellationSuccess> 
cancellation = cluster
                                                
.getLeaderGateway(deadline.timeLeft())
                                                .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
+                                               
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
 
                                Await.ready(cancellation, deadline.timeLeft());
                        }
@@ -399,15 +361,13 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                final int numElements = 1024;
 
-               final QueryableStateClient client = new QueryableStateClient(
-                       cluster.configuration(),
-                       cluster.highAvailabilityServices());
+               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
 
                JobID jobId = null;
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setStateBackend(stateBackend);
-                       env.setParallelism(NUM_SLOTS);
+                       env.setParallelism(maxParallelism);
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
@@ -467,15 +427,13 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                final int numElements = 1024;
 
-               final QueryableStateClient client = new QueryableStateClient(
-                       cluster.configuration(),
-                       cluster.highAvailabilityServices());
+               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
 
                JobID jobId = null;
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setStateBackend(stateBackend);
-                       env.setParallelism(NUM_SLOTS);
+                       env.setParallelism(maxParallelism);
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
@@ -536,7 +494,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
        }
 
        /**
-        * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} 
until
+        * Retry a query for state for keys between 0 and {@link 
#maxParallelism} until
         * <tt>expected</tt> equals the value of the result tuple's second 
field.
         */
        private void executeQuery(
@@ -547,7 +505,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        final StateDescriptor<?, Tuple2<Integer, Long>> 
stateDescriptor,
                        final long expected) throws Exception {
 
-               for (int key = 0; key < NUM_SLOTS; key++) {
+               for (int key = 0; key < maxParallelism; key++) {
                        boolean success = false;
                        while (deadline.hasTimeLeft() && !success) {
                                Future<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(client,
@@ -575,7 +533,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
        }
 
        /**
-        * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} 
until
+        * Retry a query for state for keys between 0 and {@link 
#maxParallelism} until
         * <tt>expected</tt> equals the value of the result tuple's second 
field.
         */
        private void executeQuery(
@@ -586,7 +544,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        final TypeSerializer<Tuple2<Integer, Long>> 
valueSerializer,
                        final long expected) throws Exception {
 
-               for (int key = 0; key < NUM_SLOTS; key++) {
+               for (int key = 0; key < maxParallelism; key++) {
                        boolean success = false;
                        while (deadline.hasTimeLeft() && !success) {
                                Future<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(client,
@@ -630,16 +588,14 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                final int numElements = 1024;
 
-               final QueryableStateClient client = new QueryableStateClient(
-                       cluster.configuration(),
-                       cluster.highAvailabilityServices());
+               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
 
                JobID jobId = null;
                try {
                        StreamExecutionEnvironment env =
                                
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setStateBackend(stateBackend);
-                       env.setParallelism(NUM_SLOTS);
+                       env.setParallelism(maxParallelism);
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
@@ -721,15 +677,13 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                final int numElements = 1024;
 
-               final QueryableStateClient client = new QueryableStateClient(
-                       cluster.configuration(),
-                       cluster.highAvailabilityServices());
+               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
 
                JobID jobId = null;
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setStateBackend(stateBackend);
-                       env.setParallelism(NUM_SLOTS);
+                       env.setParallelism(maxParallelism);
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
@@ -789,15 +743,13 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                final int numElements = 1024;
 
-               final QueryableStateClient client = new QueryableStateClient(
-                       cluster.configuration(),
-                       cluster.highAvailabilityServices());
+               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
 
                JobID jobId = null;
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setStateBackend(stateBackend);
-                       env.setParallelism(NUM_SLOTS);
+                       env.setParallelism(maxParallelism);
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
@@ -833,7 +785,7 @@ public abstract class AbstractQueryableStateITCase extends 
TestLogger {
                        // Now query
                        String expected = Integer.toString(numElements * 
(numElements + 1) / 2);
 
-                       for (int key = 0; key < NUM_SLOTS; key++) {
+                       for (int key = 0; key < maxParallelism; key++) {
                                boolean success = false;
                                while (deadline.hasTimeLeft() && !success) {
                                        Future<String> future = 
getKvStateWithRetries(client,
@@ -884,15 +836,13 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
 
                final int numElements = 1024;
 
-               final QueryableStateClient client = new QueryableStateClient(
-                       cluster.configuration(),
-                       cluster.highAvailabilityServices());
+               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
 
                JobID jobId = null;
                try {
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setStateBackend(stateBackend);
-                       env.setParallelism(NUM_SLOTS);
+                       env.setParallelism(maxParallelism);
                        // Very important, because cluster is shared between 
tests and we
                        // don't explicitly check that all slots are available 
before
                        // submitting.
@@ -1099,7 +1049,7 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
        /**
         * Test source producing (key, 1) tuples with random key in key range 
(numKeys).
         */
-       private static class TestKeyRangeSource extends 
RichParallelSourceFunction<Tuple2<Integer, Long>>
+       protected static class TestKeyRangeSource extends 
RichParallelSourceFunction<Tuple2<Integer, Long>>
                        implements CheckpointListener {
                private static final long serialVersionUID = 
-5744725196953582710L;
 
@@ -1148,6 +1098,9 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                }
        }
 
+       /**
+        * Test {@link FoldFunction} concatenating the already stored string 
with the long passed as argument.
+        */
        private static class SumFold implements FoldFunction<Tuple2<Integer, 
Long>, String> {
                private static final long serialVersionUID = 
-6249227626701264599L;
 
@@ -1159,7 +1112,10 @@ public abstract class AbstractQueryableStateITCase 
extends TestLogger {
                }
        }
 
-       private static class SumReduce implements 
ReduceFunction<Tuple2<Integer, Long>> {
+       /**
+        * Test {@link ReduceFunction} summing up its two arguments.
+        */
+       protected static class SumReduce implements 
ReduceFunction<Tuple2<Integer, Long>> {
                private static final long serialVersionUID = 
-8651235077342052336L;
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..cd89e00
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.apache.curator.test.TestingServer;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the NON-HA mode.
+ */
+public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
+
+       private static final int NUM_JMS = 2;
+       private static final int NUM_TMS = 4;
+       private static final int NUM_SLOTS_PER_TM = 4;
+
+       private static TestingServer zkServer;
+       private static TemporaryFolder temporaryFolder;
+
+       @BeforeClass
+       public static void setup() {
+               try {
+                       zkServer = new TestingServer();
+                       temporaryFolder = new TemporaryFolder();
+                       temporaryFolder.create();
+
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
+                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
+
+                       cluster = new TestingCluster(config, false);
+                       cluster.start();
+
+                       testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+                       // verify that we are in HA mode
+                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.ZOOKEEPER);
+
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               if (cluster != null) {
+                       cluster.stop();
+                       cluster.awaitTermination();
+               }
+
+               testActorSystem.shutdown();
+               testActorSystem.awaitTermination();
+
+               try {
+                       zkServer.stop();
+                       zkServer.close();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+
+               temporaryFolder.delete();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..5d5b671
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
FsStateBackend}.
+ */
+public class HAQueryableStateITCaseFsBackend extends 
HAAbstractQueryableStateITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..22570b5
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
+ */
+public class HAQueryableStateITCaseRocksDBBackend extends 
HAAbstractQueryableStateITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..83f86e4
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the HA mode.
+ */
+public abstract class NonHAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
+
+       private static final int NUM_TMS = 2;
+       private static final int NUM_SLOTS_PER_TM = 4;
+
+       @BeforeClass
+       public static void setup() {
+               try {
+                       Configuration config = new Configuration();
+                       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
4L);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
+                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
+                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+
+                       cluster = new TestingCluster(config, false);
+                       cluster.start(true);
+
+                       testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+                       // verify that we are not in HA mode
+                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.NONE);
+
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               try {
+                       cluster.shutdown();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+
+               if (testActorSystem != null) {
+                       testActorSystem.shutdown();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..d4dbe83
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
FsStateBackend}.
+ */
+public class NonHAQueryableStateITCaseFsBackend extends 
NonHAAbstractQueryableStateITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..a15e6a4
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.query;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
+ */
+public class NonHAQueryableStateITCaseRocksDBBackend extends 
NonHAAbstractQueryableStateITCase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Override
+       protected AbstractStateBackend createStateBackend() throws Exception {
+               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java
deleted file mode 100644
index b91d277..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
FsStateBackend}.
- */
-public class QueryableStateITCaseFsBackend extends 
AbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java
deleted file mode 100644
index 312970e..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-
-/**
- * Several integration tests for queryable state using the {@link 
MemoryStateBackend}.
- */
-public class QueryableStateITCaseMemoryBackend extends 
AbstractQueryableStateITCase {
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new MemoryStateBackend();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index 9547c5a..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
- */
-public class QueryableStateITCaseRocksDBBackend extends 
AbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

Reply via email to