[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource This closes #5669.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84ad2cd4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84ad2cd4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84ad2cd4 Branch: refs/heads/release-1.5 Commit: 84ad2cd4b13db2dbe4a144aa2e3a2802e79f77b9 Parents: 69b8a92 Author: zentol <ches...@apache.org> Authored: Wed Mar 7 13:39:25 2018 +0100 Committer: zentol <ches...@apache.org> Committed: Fri Apr 6 15:24:03 2018 +0200 ---------------------------------------------------------------------- .../connectors/kafka/KafkaConsumerTestBase.java | 219 +++++++++++-------- .../connectors/kafka/KafkaTestBase.java | 25 +-- .../testutils/ClusterCommunicationUtils.java | 56 +++++ .../testutils/JobManagerCommunicationUtils.java | 147 ------------- 4 files changed, 186 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 959d6f1..6ed9143 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -34,6 +35,7 @@ import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; @@ -42,7 +44,8 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -54,11 +57,11 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; -import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper; import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner; @@ -72,6 +75,9 @@ import org.apache.flink.test.util.SuccessException; import org.apache.flink.testutils.junit.RetryOnException; import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.Collector; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; @@ -106,10 +112,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.getRunningJobs; +import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilJobIsRunning; +import static org.apache.flink.streaming.connectors.kafka.testutils.ClusterCommunicationUtils.waitUntilNoJobIsRunning; import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -123,6 +131,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { @Rule public RetryRule retryRule = new RetryRule(); + private ClusterClient<?> client; + // ------------------------------------------------------------------------ // Common Test Preparation // ------------------------------------------------------------------------ @@ -132,8 +142,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { * the same mini cluster. Otherwise, missing slots may happen. */ @Before - public void ensureNoJobIsLingering() throws Exception { - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + public void setClientAndEnsureNoJobIsLingering() throws Exception { + client = flink.getClusterClient(); + waitUntilNoJobIsRunning(client); } // ------------------------------------------------------------------------ @@ -244,7 +255,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { while (System.nanoTime() < deadline); // cancel the job & wait for the job to finish - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + client.cancel(Iterables.getOnlyElement(getRunningJobs(client))); runner.join(); final Throwable t = errorRef.get(); @@ -330,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { while (System.nanoTime() < deadline); // cancel the job & wait for the job to finish - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + client.cancel(Iterables.getOnlyElement(getRunningJobs(client))); runner.join(); final Throwable t = errorRef.get(); @@ -443,14 +454,18 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { }).setParallelism(1) .addSink(new DiscardingSink<>()); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); + final JobID consumeJobId = jobGraph.getJobID(); + final AtomicReference<Throwable> error = new AtomicReference<>(); Thread consumeThread = new Thread(new Runnable() { @Override public void run() { try { - env.execute(consumeExtraRecordsJobName); + client.setDetached(false); + client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader()); } catch (Throwable t) { - if (!(t instanceof JobCancellationException)) { + if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) { error.set(t); } } @@ -459,9 +474,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { consumeThread.start(); // wait until the consuming job has started, to be extra safe - JobManagerCommunicationUtils.waitUntilJobIsRunning( - flink.getLeaderGateway(timeout), - consumeExtraRecordsJobName); + waitUntilJobIsRunning(client); // setup the extra records writing job final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -500,9 +513,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } // cancel the consume job after all extra records are written - JobManagerCommunicationUtils.cancelCurrentJob( - flink.getLeaderGateway(timeout), - consumeExtraRecordsJobName); + client.cancel(consumeJobId); consumeThread.join(); kafkaOffsetHandler.close(); @@ -989,23 +1000,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final AtomicReference<Throwable> jobError = new AtomicReference<>(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.enableCheckpointing(100); + env.getConfig().disableSysoutLogging(); + + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); + + env.addSource(source).addSink(new DiscardingSink<String>()); + + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); + final JobID jobId = jobGraph.getJobID(); + final Runnable jobRunner = new Runnable() { @Override public void run() { try { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(parallelism); - env.enableCheckpointing(100); - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); - - env.addSource(source).addSink(new DiscardingSink<String>()); - - env.execute("Runner for CancelingOnFullInputTest"); + client.setDetached(false); + client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader()); } catch (Throwable t) { jobError.set(t); @@ -1026,14 +1041,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } // cancel - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout), "Runner for CancelingOnFullInputTest"); + client.cancel(jobId); // wait for the program to be done and validate that we failed with the right exception runnerThread.join(); - failueCause = jobError.get(); - assertNotNull("program did not fail properly due to canceling", failueCause); - assertTrue(failueCause.getMessage().contains("Job was cancelled")); + assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get()); if (generator.isAlive()) { generator.shutdown(); @@ -1063,23 +1076,27 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final AtomicReference<Throwable> error = new AtomicReference<>(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.enableCheckpointing(100); + env.getConfig().disableSysoutLogging(); + + Properties props = new Properties(); + props.putAll(standardProps); + props.putAll(secureProps); + FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); + + env.addSource(source).addSink(new DiscardingSink<String>()); + + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); + final JobID jobId = jobGraph.getJobID(); + final Runnable jobRunner = new Runnable() { @Override public void run() { try { - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(parallelism); - env.enableCheckpointing(100); - env.getConfig().disableSysoutLogging(); - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); - - env.addSource(source).addSink(new DiscardingSink<String>()); - - env.execute("CancelingOnEmptyInputTest"); + client.setDetached(false); + client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader()); } catch (Throwable t) { LOG.error("Job Runner failed with exception", t); @@ -1100,14 +1117,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); } // cancel - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + client.cancel(jobId); // wait for the program to be done and validate that we failed with the right exception runnerThread.join(); - failueCause = error.get(); - assertNotNull("program did not fail properly due to canceling", failueCause); - assertTrue(failueCause.getMessage().contains("Job was cancelled")); + assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get()); deleteTestTopic(topic); } @@ -1558,52 +1573,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { createTestTopic(topic, 5, 1); final Tuple1<Throwable> error = new Tuple1<>(null); - Runnable job = new Runnable() { + + // start job writing & reading data. + final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment(); + env1.setParallelism(1); + env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env1.getConfig().disableSysoutLogging(); + env1.disableOperatorChaining(); // let the source read everything into the network buffers + + TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig()); + DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); + fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { @Override - public void run() { - try { - // start job writing & reading data. - final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment(); - env1.setParallelism(1); - env1.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - env1.getConfig().disableSysoutLogging(); - env1.disableOperatorChaining(); // let the source read everything into the network buffers - - Properties props = new Properties(); - props.putAll(standardProps); - props.putAll(secureProps); - - TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>"), env1.getConfig()); - DataStream<Tuple2<Integer, Integer>> fromKafka = env1.addSource(kafkaServer.getConsumer(topic, schema, standardProps)); - fromKafka.flatMap(new FlatMapFunction<Tuple2<Integer, Integer>, Void>() { - @Override - public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op - } - }); - - DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() { - boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { - int i = 0; - while (running) { - ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); - Thread.sleep(1); - } - } + public void flatMap(Tuple2<Integer, Integer> value, Collector<Void> out) throws Exception {// no op + } + }); - @Override - public void cancel() { - running = false; - } - }); + DataStream<Tuple2<Integer, Integer>> fromGen = env1.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() { + boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { + int i = 0; + while (running) { + ctx.collect(Tuple2.of(i++, getRuntimeContext().getIndexOfThisSubtask())); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + }); + + kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null); - kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph()); + final JobID jobId = jobGraph.getJobID(); - env1.execute("Metrics test job"); + Runnable job = new Runnable() { + @Override + public void run() { + try { + client.setDetached(false); + client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader()); } catch (Throwable t) { - if (!(t instanceof JobCancellationException)) { // we'll cancel the job + if (!ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) { LOG.warn("Got exception during execution", t); error.f0 = t; } @@ -1653,7 +1669,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { LOG.info("Found all JMX metrics. Cancelling job."); } finally { // cancel - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + client.cancel(jobId); // wait for the job to finish (it should due to the cancel command above) jobThread.join(); } @@ -1903,7 +1919,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { catch (Exception e) { LOG.error("Write attempt failed, trying again", e); deleteTestTopic(topicName); - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + waitUntilNoJobIsRunning(client); continue; } @@ -1914,7 +1930,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // we need to validate the sequence, because kafka's producers are not exactly once LOG.info("Validating sequence"); - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + waitUntilNoJobIsRunning(client); if (validateSequence(topicName, parallelism, deserSchema, numElements)) { // everything is good! @@ -1996,7 +2012,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // we need to validate the sequence, because kafka's producers are not exactly once LOG.info("Validating sequence"); - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + while (!getRunningJobs(client).isEmpty()){ + Thread.sleep(50); + } if (!validateSequence(topicName, parallelism, deserSchema, originalNumElements + numElementsToAppend)) { throw new Exception("Could not append a valid sequence to Kafka."); @@ -2040,13 +2058,20 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(readEnv.getStreamGraph()); + final JobID jobId = jobGraph.getJobID(); + Thread runner = new Thread() { @Override public void run() { try { + client.setDetached(false); + client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader()); tryExecute(readEnv, "sequence validation"); } catch (Throwable t) { - errorRef.set(t); + if (!ExceptionUtils.findThrowable(t, SuccessException.class).isPresent()) { + errorRef.set(t); + } } } }; @@ -2064,7 +2089,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // did not finish in time, maybe the producer dropped one or more records and // the validation did not reach the exit point success = false; - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + client.cancel(jobId); } else { Throwable error = errorRef.get(); @@ -2077,7 +2102,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } } - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + waitUntilNoJobIsRunning(client); return success; } http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index f471cd4..697e075 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -24,9 +24,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; @@ -75,13 +75,17 @@ public abstract class KafkaTestBase extends TestLogger { protected static final int TM_SLOTS = 8; - protected static final int PARALLELISM = NUM_TMS * TM_SLOTS; - protected static String brokerConnectionStrings; protected static Properties standardProps; - protected static LocalFlinkMiniCluster flink; + @ClassRule + public static MiniClusterResource flink = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + getFlinkConfiguration(), + NUM_TMS, + TM_SLOTS), + true); protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); @@ -107,8 +111,6 @@ public abstract class KafkaTestBase extends TestLogger { LOG.info("-------------------------------------------------------------------------"); startClusters(false, hideKafkaBehindProxy); - - TestStreamEnvironment.setAsContext(flink, PARALLELISM); } @AfterClass @@ -131,8 +133,6 @@ public abstract class KafkaTestBase extends TestLogger { Configuration flinkConfig = new Configuration(); flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s"); flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s"); - flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS); flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); @@ -163,18 +163,9 @@ public abstract class KafkaTestBase extends TestLogger { } secureProps = kafkaServer.getSecureProperties(); } - - // start also a re-usable Flink mini cluster - flink = new LocalFlinkMiniCluster(getFlinkConfiguration(), false); - flink.start(); } protected static void shutdownClusters() throws Exception { - - if (flink != null) { - flink.stop(); - } - if (secureProps != null) { secureProps.clear(); } http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java new file mode 100644 index 0000000..41f9d1e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/ClusterCommunicationUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.runtime.client.JobStatusMessage; + +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Utilities for communicating with a cluster through a {@link ClusterClient}. + */ +public class ClusterCommunicationUtils { + + public static void waitUntilJobIsRunning(ClusterClient<?> client) throws Exception { + while (getRunningJobs(client).isEmpty()) { + Thread.sleep(50); + } + } + + public static void waitUntilNoJobIsRunning(ClusterClient<?> client) throws Exception { + while (!getRunningJobs(client).isEmpty()) { + Thread.sleep(50); + } + } + + public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception { + Collection<JobStatusMessage> statusMessages = client.listJobs().get(); + return statusMessages.stream() + .filter(status -> !status.getJobState().isGloballyTerminalState()) + .map(JobStatusMessage::getJobId) + .collect(Collectors.toList()); + } + + private ClusterCommunicationUtils() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/84ad2cd4/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java deleted file mode 100644 index 9bbe1d3..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.JobManagerMessages; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -/** - * Utilities for communicating with a jobmanager through a {@link ActorGateway}. - */ -public class JobManagerCommunicationUtils { - - private static final FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS); - - public static void waitUntilNoJobIsRunning(ActorGateway jobManager) throws Exception { - while (true) { - // find the jobID - Future<Object> listResponse = jobManager.ask( - JobManagerMessages.getRequestRunningJobsStatus(), askTimeout); - - Object result = Await.result(listResponse, askTimeout); - List<JobStatusMessage> jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); - - if (jobs.isEmpty()) { - return; - } - - Thread.sleep(50); - } - } - - public static void waitUntilJobIsRunning(ActorGateway jobManager, String name) throws Exception { - while (true) { - Future<Object> listResponse = jobManager.ask( - JobManagerMessages.getRequestRunningJobsStatus(), - askTimeout); - - List<JobStatusMessage> jobs; - try { - Object result = Await.result(listResponse, askTimeout); - jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); - } - catch (Exception e) { - throw new Exception("Could not wait for job to start - failed to retrieve running jobs from the JobManager.", e); - } - - // see if the running jobs contain the requested job - for (JobStatusMessage job : jobs) { - if (job.getJobName().equals(name)) { - return; - } - } - - Thread.sleep(50); - } - } - - public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { - cancelCurrentJob(jobManager, null); - } - - public static void cancelCurrentJob(ActorGateway jobManager, String name) throws Exception { - JobStatusMessage status = null; - - for (int i = 0; i < 200; i++) { - // find the jobID - Future<Object> listResponse = jobManager.ask( - JobManagerMessages.getRequestRunningJobsStatus(), - askTimeout); - - List<JobStatusMessage> jobs; - try { - Object result = Await.result(listResponse, askTimeout); - jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); - } - catch (Exception e) { - throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e); - } - - if (jobs.isEmpty()) { - // try again, fall through the loop - Thread.sleep(50); - } - else if (jobs.size() == 1) { - status = jobs.get(0); - } - else if (name != null) { - for (JobStatusMessage msg: jobs) { - if (msg.getJobName().equals(name)) { - status = msg; - } - } - if (status == null) { - throw new Exception("Could not cancel job - no job matched expected name = '" + name + "' in " + jobs); - } - } else { - String jobNames = ""; - for (JobStatusMessage jsm: jobs) { - jobNames += jsm.getJobName() + ", "; - } - throw new Exception("Could not cancel job - more than one running job: " + jobNames); - } - } - - if (status == null) { - throw new Exception("Could not cancel job - no running jobs"); - } - else if (status.getJobState().isGloballyTerminalState()) { - throw new Exception("Could not cancel job - job is not running any more"); - } - - JobID jobId = status.getJobId(); - - Future<Object> response = jobManager.ask(new JobManagerMessages.CancelJob(jobId), askTimeout); - try { - Await.result(response, askTimeout); - } - catch (Exception e) { - throw new Exception("Sending the 'cancel' message failed.", e); - } - } -}