Repository: flink
Updated Branches:
  refs/heads/master 3e9d33ee5 -> 209ae6c91


[FLINK-3061] Properly fail Kafka Consumer if broker is not available

This closes #1395


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/209ae6c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/209ae6c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/209ae6c9

Branch: refs/heads/master
Commit: 209ae6c916e1bff7126b074dfe831bbc7b113e4a
Parents: 3e9d33e
Author: Robert Metzger <rmetz...@apache.org>
Authored: Mon Nov 23 17:57:26 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Mon Nov 30 16:04:36 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer.java    | 11 +++++++-
 .../connectors/kafka/KafkaConsumerTestBase.java | 29 ++++++++++++++++++++
 .../streaming/connectors/kafka/KafkaITCase.java |  5 ++++
 3 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index e42faef..2d1d91a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -306,6 +306,11 @@ public class FlinkKafkaConsumer<T> extends 
RichParallelSourceFunction<T>
                // Connect to a broker to get the partitions
                List<PartitionInfo> partitionInfos = 
getPartitionsForTopic(topic, props);
 
+               if (partitionInfos.size() == 0) {
+                       throw new RuntimeException("Unable to retrieve any 
partitions for topic " + topic + "." +
+                                       "Please check previous log entries");
+               }
+
                // get initial partitions list. The order of the partitions is 
important for consistent 
                // partition id assignment in restart cases.
                this.partitions = new int[partitionInfos.size()];
@@ -424,7 +429,11 @@ public class FlinkKafkaConsumer<T> extends 
RichParallelSourceFunction<T>
                        } finally {
                                if (offsetCommitter != null) {
                                        offsetCommitter.close();
-                                       offsetCommitter.join();
+                                       try {
+                                               offsetCommitter.join();
+                                       } catch(InterruptedException ie) {
+                                               // ignore interrupt
+                                       }
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 48f4c50..2116c01 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -123,6 +123,35 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        //  select which tests to run.
        // 
------------------------------------------------------------------------
 
+
+       /**
+        * Test that ensures the KafkaConsumer is properly failing if the topic 
doesnt exist
+        * and a wrong broker was specified
+        *
+        * @throws Exception
+        */
+       public void runFailOnNoBrokerTest() throws Exception {
+               try {
+                       Properties properties = new Properties();
+
+                       StreamExecutionEnvironment see = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+                       see.getConfig().disableSysoutLogging();
+                       see.setNumberOfExecutionRetries(0);
+                       see.setParallelism(1);
+
+                       // use wrong ports for the consumers
+                       properties.setProperty("bootstrap.servers", 
"localhost:80");
+                       properties.setProperty("zookeeper.connect", 
"localhost:80");
+                       properties.setProperty("group.id", "test");
+                       FlinkKafkaConsumer<String> source = 
getConsumer("doesntexist", new SimpleStringSchema(), properties);
+                       DataStream<String> stream = see.addSource(source);
+                       stream.print();
+                       see.execute("No broker test");
+               } catch(RuntimeException re){
+                       Assert.assertTrue("Wrong RuntimeException thrown",
+                                       re.getMessage().contains("Unable to 
retrieve any partitions for topic"));
+               }
+       }
        /**
         * Test that validates that checkpointing and checkpoint notification 
works properly
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 3ca7c5c..5f2cdbc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -40,6 +40,11 @@ public class KafkaITCase extends KafkaConsumerTestBase {
                runCheckpointingTest();
        }
 
+       @Test()
+       public void testFailOnNoBroker() throws Exception {
+               runFailOnNoBrokerTest();
+       }
+
        @Test
        public void testOffsetInZookeeper() throws Exception {
                runOffsetInZookeeperValidationTest();

Reply via email to