Repository: flink Updated Branches: refs/heads/master c685251ce -> e3b27edfc
[FLINK-7379] [qs] Remove HighAvailabilityServices from QS client constructor. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3b27edf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3b27edf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3b27edf Branch: refs/heads/master Commit: e3b27edfcc22c1157fa16a8f290636fc48bfe142 Parents: c685251 Author: kkloudas <kklou...@gmail.com> Authored: Mon Jul 31 13:35:24 2017 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Fri Aug 11 10:58:16 2017 +0200 ---------------------------------------------------------------------- .../runtime/query/QueryableStateClient.java | 20 +++ .../query/AbstractQueryableStateITCase.java | 148 +++++++------------ .../query/HAAbstractQueryableStateITCase.java | 102 +++++++++++++ .../query/HAQueryableStateITCaseFsBackend.java | 39 +++++ .../HAQueryableStateITCaseRocksDBBackend.java | 39 +++++ .../NonHAAbstractQueryableStateITCase.java | 81 ++++++++++ .../NonHAQueryableStateITCaseFsBackend.java | 39 +++++ ...NonHAQueryableStateITCaseRocksDBBackend.java | 39 +++++ .../query/QueryableStateITCaseFsBackend.java | 39 ----- .../QueryableStateITCaseMemoryBackend.java | 34 ----- .../QueryableStateITCaseRocksDBBackend.java | 39 ----- 11 files changed, 411 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java index 1b1c8f8..4ba6929 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java @@ -31,7 +31,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.QueryableStateOptions; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory; import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory; @@ -116,9 +118,27 @@ public class QueryableStateClient { * system and another for the network client. * * @param config Configuration to use. + * @throws Exception Failures are forwarded + */ + public QueryableStateClient(Configuration config) throws Exception { + this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices( + config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION)); + } + + /** + * Creates a client from the given configuration. + * + * <p>This will create multiple Thread pools: one for the started actor + * system and another for the network client. + * + * @param config Configuration to use. * @param highAvailabilityServices Service factory for high availability services * @throws Exception Failures are forwarded + * + * @deprecated This constructor is deprecated and stays only for backwards compatibility. Use the + * {@link #QueryableStateClient(Configuration)} instead. */ + @Deprecated public QueryableStateClient( Configuration config, HighAvailabilityServices highAvailabilityServices) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java index 21e3b4c..8ac3d2f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java @@ -34,21 +34,18 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; -import org.apache.flink.runtime.messages.JobManagerMessages.JobFound; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; import org.apache.flink.runtime.query.QueryableStateClient; import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; -import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.QueryableStateStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -61,9 +58,8 @@ import akka.dispatch.Futures; import akka.dispatch.OnSuccess; import akka.dispatch.Recover; import akka.pattern.Patterns; -import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import java.util.ArrayList; @@ -80,25 +76,18 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import scala.reflect.ClassTag$; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs; -import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Base class for queryable state integration tests with a configurable state backend. */ public abstract class AbstractQueryableStateITCase extends TestLogger { - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); + protected static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000, TimeUnit.SECONDS); private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS); - private static ActorSystem testActorSystem; - - private static final int NUM_TMS = 2; - private static final int NUM_SLOTS_PER_TM = 4; - private static final int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM; + protected static ActorSystem testActorSystem; /** * State backend to use. @@ -109,47 +98,19 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { * Shared between all the test. Make sure to have at least NUM_SLOTS * available after your test finishes, e.g. cancel the job you submitted. */ - private static TestingCluster cluster; - - @BeforeClass - public static void setup() { - try { - Configuration config = new Configuration(); - config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); - config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); - config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); - config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); - - cluster = new TestingCluster(config, false); - cluster.start(true); - - testActorSystem = AkkaUtils.createDefaultActorSystem(); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + protected static FlinkMiniCluster cluster; - @AfterClass - public static void tearDown() { - try { - cluster.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - if (testActorSystem != null) { - testActorSystem.shutdown(); - } - } + protected static int maxParallelism; @Before public void setUp() throws Exception { // NOTE: do not use a shared instance for all tests as the tests may brake this.stateBackend = createStateBackend(); + + Assert.assertNotNull(cluster); + + maxParallelism = cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) * + cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); } /** @@ -175,9 +136,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final Deadline deadline = TEST_TIMEOUT.fromNow(); final int numKeys = 256; - final QueryableStateClient client = new QueryableStateClient( - cluster.configuration(), - cluster.highAvailabilityServices()); + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); JobID jobId = null; @@ -187,7 +146,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); - env.setParallelism(NUM_SLOTS); + env.setParallelism(maxParallelism); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. @@ -293,10 +252,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { /** * Tests that duplicate query registrations fail the job at the JobManager. + * + * <b>NOTE: </b> This test is only in the non-HA variant of the tests because + * in the HA mode we use the actual JM code which does not recognize the + * {@code NotifyWhenJobStatus} message. * */ @Test public void testDuplicateRegistrationFailsJob() throws Exception { - // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); final int numKeys = 256; @@ -308,7 +270,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); - env.setParallelism(NUM_SLOTS); + env.setParallelism(maxParallelism); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. @@ -349,21 +311,21 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobId = jobGraph.getJobID(); - Future<JobStatusIs> failedFuture = cluster + Future<TestingJobManagerMessages.JobStatusIs> failedFuture = cluster .getLeaderGateway(deadline.timeLeft()) - .ask(new NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<JobStatusIs>apply(JobStatusIs.class)); + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)); cluster.submitJobDetached(jobGraph); - JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft()); + TestingJobManagerMessages.JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft()); assertEquals(JobStatus.FAILED, jobStatus.state()); // Get the job and check the cause - JobFound jobFound = Await.result( + JobManagerMessages.JobFound jobFound = Await.result( cluster.getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<JobFound>apply(JobFound.class)), + .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)), deadline.timeLeft()); String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString(); @@ -376,10 +338,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } finally { // Free cluster resources if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster + Future<JobManagerMessages.CancellationSuccess> cancellation = cluster .getLeaderGateway(deadline.timeLeft()) .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); + .mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class)); Await.ready(cancellation, deadline.timeLeft()); } @@ -399,15 +361,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final int numElements = 1024; - final QueryableStateClient client = new QueryableStateClient( - cluster.configuration(), - cluster.highAvailabilityServices()); + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); - env.setParallelism(NUM_SLOTS); + env.setParallelism(maxParallelism); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. @@ -467,15 +427,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final int numElements = 1024; - final QueryableStateClient client = new QueryableStateClient( - cluster.configuration(), - cluster.highAvailabilityServices()); + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); - env.setParallelism(NUM_SLOTS); + env.setParallelism(maxParallelism); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. @@ -536,7 +494,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } /** - * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until + * Retry a query for state for keys between 0 and {@link #maxParallelism} until * <tt>expected</tt> equals the value of the result tuple's second field. */ private void executeQuery( @@ -547,7 +505,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final StateDescriptor<?, Tuple2<Integer, Long>> stateDescriptor, final long expected) throws Exception { - for (int key = 0; key < NUM_SLOTS; key++) { + for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client, @@ -575,7 +533,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } /** - * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until + * Retry a query for state for keys between 0 and {@link #maxParallelism} until * <tt>expected</tt> equals the value of the result tuple's second field. */ private void executeQuery( @@ -586,7 +544,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final TypeSerializer<Tuple2<Integer, Long>> valueSerializer, final long expected) throws Exception { - for (int key = 0; key < NUM_SLOTS; key++) { + for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client, @@ -630,16 +588,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final int numElements = 1024; - final QueryableStateClient client = new QueryableStateClient( - cluster.configuration(), - cluster.highAvailabilityServices()); + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); - env.setParallelism(NUM_SLOTS); + env.setParallelism(maxParallelism); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. @@ -721,15 +677,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final int numElements = 1024; - final QueryableStateClient client = new QueryableStateClient( - cluster.configuration(), - cluster.highAvailabilityServices()); + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); - env.setParallelism(NUM_SLOTS); + env.setParallelism(maxParallelism); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. @@ -789,15 +743,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final int numElements = 1024; - final QueryableStateClient client = new QueryableStateClient( - cluster.configuration(), - cluster.highAvailabilityServices()); + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); - env.setParallelism(NUM_SLOTS); + env.setParallelism(maxParallelism); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. @@ -833,7 +785,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Now query String expected = Integer.toString(numElements * (numElements + 1) / 2); - for (int key = 0; key < NUM_SLOTS; key++) { + for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { Future<String> future = getKvStateWithRetries(client, @@ -884,15 +836,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { final int numElements = 1024; - final QueryableStateClient client = new QueryableStateClient( - cluster.configuration(), - cluster.highAvailabilityServices()); + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(stateBackend); - env.setParallelism(NUM_SLOTS); + env.setParallelism(maxParallelism); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. @@ -1099,7 +1049,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { /** * Test source producing (key, 1) tuples with random key in key range (numKeys). */ - private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> + protected static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener { private static final long serialVersionUID = -5744725196953582710L; @@ -1148,6 +1098,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } } + /** + * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. + */ private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> { private static final long serialVersionUID = -6249227626701264599L; @@ -1159,7 +1112,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } } - private static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> { + /** + * Test {@link ReduceFunction} summing up its two arguments. + */ + protected static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> { private static final long serialVersionUID = -8651235077342052336L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java new file mode 100644 index 0000000..cd89e00 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.testingUtils.TestingCluster; + +import org.apache.curator.test.TestingServer; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.fail; + +/** + * Base class with the cluster configuration for the tests on the NON-HA mode. + */ +public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { + + private static final int NUM_JMS = 2; + private static final int NUM_TMS = 4; + private static final int NUM_SLOTS_PER_TM = 4; + + private static TestingServer zkServer; + private static TemporaryFolder temporaryFolder; + + @BeforeClass + public static void setup() { + try { + zkServer = new TestingServer(); + temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); + config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); + config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + cluster = new TestingCluster(config, false); + cluster.start(); + + testActorSystem = AkkaUtils.createDefaultActorSystem(); + + // verify that we are in HA mode + Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.stop(); + cluster.awaitTermination(); + } + + testActorSystem.shutdown(); + testActorSystem.awaitTermination(); + + try { + zkServer.stop(); + zkServer.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + temporaryFolder.delete(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java new file mode 100644 index 0000000..5d5b671 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link FsStateBackend}. + */ +public class HAQueryableStateITCaseFsBackend extends HAAbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java new file mode 100644 index 0000000..22570b5 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link RocksDBStateBackend}. + */ +public class HAQueryableStateITCaseRocksDBBackend extends HAAbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java new file mode 100644 index 0000000..83f86e4 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.testingUtils.TestingCluster; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +import static org.junit.Assert.fail; + +/** + * Base class with the cluster configuration for the tests on the HA mode. + */ +public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { + + private static final int NUM_TMS = 2; + private static final int NUM_SLOTS_PER_TM = 4; + + @BeforeClass + public static void setup() { + try { + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); + config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); + config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); + + cluster = new TestingCluster(config, false); + cluster.start(true); + + testActorSystem = AkkaUtils.createDefaultActorSystem(); + + // verify that we are not in HA mode + Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @AfterClass + public static void tearDown() { + try { + cluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + if (testActorSystem != null) { + testActorSystem.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java new file mode 100644 index 0000000..d4dbe83 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link FsStateBackend}. + */ +public class NonHAQueryableStateITCaseFsBackend extends NonHAAbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java new file mode 100644 index 0000000..a15e6a4 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link RocksDBStateBackend}. + */ +public class NonHAQueryableStateITCaseRocksDBBackend extends NonHAAbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java deleted file mode 100644 index b91d277..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; - -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link FsStateBackend}. - */ -public class QueryableStateITCaseFsBackend extends AbstractQueryableStateITCase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java deleted file mode 100644 index 312970e..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; - -/** - * Several integration tests for queryable state using the {@link MemoryStateBackend}. - */ -public class QueryableStateITCaseMemoryBackend extends AbstractQueryableStateITCase { - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new MemoryStateBackend(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b27edf/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java deleted file mode 100644 index 9547c5a..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; - -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link RocksDBStateBackend}. - */ -public class QueryableStateITCaseRocksDBBackend extends AbstractQueryableStateITCase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -}