[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5669 ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r179332025 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws Exception { final AtomicReference error = new AtomicReference<>(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); --- End diff -- As you wish, I can open a follow up since it's a trivial fixup. ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r178769136 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws Exception { final AtomicReference error = new AtomicReference<>(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); --- End diff -- That would go further than porting the test; I'd rather not do that in this PR. ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r177747320 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws Exception { final AtomicReference error = new AtomicReference<>(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); --- End diff -- Please unify and extract common code between `runCancelingOnFullInputTest` and `runCancelingOnEmptyInputTest` for example into: ``` private void runCancelingTest(boolean emptyInput) throws Exception { final String topic = emptyInput ? "cancelingOnEmptyInputTopic" : "cancelingOnFullTopic"; final int parallelism = 3; createTestTopic(topic, parallelism, 1); // launch a producer thread DataGenerators.InfiniteStringsGenerator generator = new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic); if (!emptyInput) { generator.start(); } // launch a consumer asynchronously final AtomicReference jobError = new AtomicReference<>(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.enableCheckpointing(100); env.getConfig().disableSysoutLogging(); Properties props = new Properties(); props.putAll(standardProps); props.putAll(secureProps); FlinkKafkaConsumerBase source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); env.addSource(source).addSink(new DiscardingSink()); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); final JobID jobId = jobGraph.getJobID(); final Runnable jobRunner = new Runnable() { @Override public void run() { try { client.setDetached(false); client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader()); } catch (Throwable t) { jobError.set(t); } } }; Thread runnerThread = new Thread(jobRunner, "program runner thread"); runnerThread.start(); // wait a bit before canceling Thread.sleep(2000); Throwable failueCause = jobError.get(); if (failueCause != null) { failueCause.printStackTrace(); Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); } // cancel client.cancel(jobId); // wait for the program to be done and validate that we failed with the right exception runnerThread.join(); assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get()); if (generator.isAlive()) { generator.shutdown(); generator.join(); } else if (!emptyInput) { Throwable t = generator.getError(); if (t != null) { t.printStackTrace(); fail("Generator failed: " + t.getMessage()); } else { fail("Generator failed with no exception"); } } deleteTestTopic(topic); } ``` ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r177742687 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -244,7 +255,7 @@ public void run() { while (System.nanoTime() < deadline); // cancel the job & wait for the job to finish - JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout)); + client.cancel(getRunningJobs(client).get(0)); --- End diff -- `Iterables.getOnlyElement(getRunningJobs(client))`? ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r177743725 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -132,8 +140,11 @@ * the same mini cluster. Otherwise, missing slots may happen. */ @Before - public void ensureNoJobIsLingering() throws Exception { - JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + public void setClientAndEnsureNoJobIsLingering() throws Exception { + client = flink.getClusterClient(); + while (!getRunningJobs(client).isEmpty()){ --- End diff -- ``` while (!getRunningJobs(client).isEmpty()){ Thread.sleep(50); } ``` This is being copied pasted couple of times. Please extract to common method. Maybe to an equivalent of `JobManagerCommunicationUtils` or even to some `TestingClusterClient`? ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r173455497 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java --- @@ -81,12 +81,6 @@ public void testCancelingEmptyTopic() throws Exception { public void testCancelingFullTopic() throws Exception { runCancelingOnFullInputTest(); } - --- End diff -- ah right, forgot to comment that. I found these test to be rather..._odd_. They check behavior when not enough slots are available, but in the old code afaik this fails before the client even submits the job, and in flip6 this stalls as we never check whether enough slots are available (i guess with the underlying assumption that we would just allocate more TMs until we have enough). ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r173437603 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java --- @@ -81,12 +81,6 @@ public void testCancelingEmptyTopic() throws Exception { public void testCancelingFullTopic() throws Exception { runCancelingOnFullInputTest(); } - --- End diff -- Why are those removed? ---
[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5669 [FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource ## What is the purpose of the change Ports the `KafkaTestBase` and extending classes to use `MiniClusterResource`. ## Verifying this change Run all tests extending `KafkaTestBase` with `flip6` profile enabled/disabled. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8703_kafkaB Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5669 commit 545b4dcccf79ce1b8bb530bae77ab4c2b6e85351 Author: zentol Date: 2018-03-07T12:38:03Z Remove Kafka testFailOnDeploy test commit 3cdfd906cdefa85fe36b6717f41e831ca7e5ea72 Author: zentol Date: 2018-03-07T12:39:25Z [FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource ---