[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-04-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5669


---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-04-04 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5669#discussion_r179332025
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws 
Exception {
 
final AtomicReference error = new 
AtomicReference<>();
 
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
--- End diff --

As you wish, I can open a follow up since it's a trivial fixup.


---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-04-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5669#discussion_r178769136
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws 
Exception {
 
final AtomicReference error = new 
AtomicReference<>();
 
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
--- End diff --

That would go further than porting the test; I'd rather not do that in this 
PR.




---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-03-28 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5669#discussion_r177747320
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws 
Exception {
 
final AtomicReference error = new 
AtomicReference<>();
 
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
--- End diff --

Please unify and extract common code between `runCancelingOnFullInputTest` 
and `runCancelingOnEmptyInputTest` for example into:

```
private void runCancelingTest(boolean emptyInput) throws Exception {
final String topic = emptyInput ? "cancelingOnEmptyInputTopic" 
: "cancelingOnFullTopic";

final int parallelism = 3;
createTestTopic(topic, parallelism, 1);

// launch a producer thread
DataGenerators.InfiniteStringsGenerator generator =
new 
DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
if (!emptyInput) {
generator.start();
}

// launch a consumer asynchronously

final AtomicReference jobError = new 
AtomicReference<>();

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.enableCheckpointing(100);
env.getConfig().disableSysoutLogging();

Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
FlinkKafkaConsumerBase source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);

env.addSource(source).addSink(new DiscardingSink());

JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
final JobID jobId = jobGraph.getJobID();

final Runnable jobRunner = new Runnable() {
@Override
public void run() {
try {
client.setDetached(false);
client.submitJob(jobGraph, 
KafkaConsumerTestBase.class.getClassLoader());
}
catch (Throwable t) {
jobError.set(t);
}
}
};

Thread runnerThread = new Thread(jobRunner, "program runner 
thread");
runnerThread.start();

// wait a bit before canceling
Thread.sleep(2000);

Throwable failueCause = jobError.get();
if (failueCause != null) {
failueCause.printStackTrace();
Assert.fail("Test failed prematurely with: " + 
failueCause.getMessage());
}

// cancel
client.cancel(jobId);

// wait for the program to be done and validate that we failed 
with the right exception
runnerThread.join();

assertEquals(JobStatus.CANCELED, 
client.getJobStatus(jobId).get());

if (generator.isAlive()) {
generator.shutdown();
generator.join();
}
else if (!emptyInput) {
Throwable t = generator.getError();
if (t != null) {
t.printStackTrace();
fail("Generator failed: " + t.getMessage());
} else {
fail("Generator failed with no exception");
}
}

deleteTestTopic(topic);
}
```




---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-03-28 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5669#discussion_r177742687
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -244,7 +255,7 @@ public void run() {
while (System.nanoTime() < deadline);
 
// cancel the job & wait for the job to finish
-   
JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+   client.cancel(getRunningJobs(client).get(0));
--- End diff --

`Iterables.getOnlyElement(getRunningJobs(client))`?


---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-03-28 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5669#discussion_r177743725
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -132,8 +140,11 @@
 * the same mini cluster. Otherwise, missing slots may happen.
 */
@Before
-   public void ensureNoJobIsLingering() throws Exception {
-   
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+   public void setClientAndEnsureNoJobIsLingering() throws Exception {
+   client = flink.getClusterClient();
+   while (!getRunningJobs(client).isEmpty()){
--- End diff --

```
while (!getRunningJobs(client).isEmpty()){
Thread.sleep(50);
}
```

This is being copied pasted couple of times. Please extract to common 
method. Maybe to an equivalent of `JobManagerCommunicationUtils` or even to 
some `TestingClusterClient`?


---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-03-09 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5669#discussion_r173455497
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 ---
@@ -81,12 +81,6 @@ public void testCancelingEmptyTopic() throws Exception {
public void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
-
--- End diff --

ah right, forgot to comment that. I found these test to be rather..._odd_. 
They check behavior when not enough slots are available, but in the old code 
afaik this fails before the client even submits the job, and in flip6 this 
stalls as we never check whether enough slots are available (i guess with the 
underlying assumption that we would just allocate more TMs until we have 
enough).


---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-03-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5669#discussion_r173437603
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 ---
@@ -81,12 +81,6 @@ public void testCancelingEmptyTopic() throws Exception {
public void testCancelingFullTopic() throws Exception {
runCancelingOnFullInputTest();
}
-
--- End diff --

Why are those removed?


---


[GitHub] flink pull request #5669: [FLINK-8703][tests] Port KafkaTestBase to MiniClus...

2018-03-09 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5669

[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource 

## What is the purpose of the change

Ports the `KafkaTestBase` and extending classes to use 
`MiniClusterResource`.

## Verifying this change

Run all tests extending `KafkaTestBase` with `flip6` profile 
enabled/disabled.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8703_kafkaB

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5669


commit 545b4dcccf79ce1b8bb530bae77ab4c2b6e85351
Author: zentol 
Date:   2018-03-07T12:38:03Z

Remove Kafka testFailOnDeploy test

commit 3cdfd906cdefa85fe36b6717f41e831ca7e5ea72
Author: zentol 
Date:   2018-03-07T12:39:25Z

[FLINK-8703][tests] Port KafkaTestBase to MiniClusterResource




---