Repository: flink Updated Branches: refs/heads/master 3e9d33ee5 -> 209ae6c91
[FLINK-3061] Properly fail Kafka Consumer if broker is not available This closes #1395 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/209ae6c9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/209ae6c9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/209ae6c9 Branch: refs/heads/master Commit: 209ae6c916e1bff7126b074dfe831bbc7b113e4a Parents: 3e9d33e Author: Robert Metzger <rmetz...@apache.org> Authored: Mon Nov 23 17:57:26 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Mon Nov 30 16:04:36 2015 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer.java | 11 +++++++- .../connectors/kafka/KafkaConsumerTestBase.java | 29 ++++++++++++++++++++ .../streaming/connectors/kafka/KafkaITCase.java | 5 ++++ 3 files changed, 44 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index e42faef..2d1d91a 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -306,6 +306,11 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T> // Connect to a broker to get the partitions List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props); + if (partitionInfos.size() == 0) { + throw new RuntimeException("Unable to retrieve any partitions for topic " + topic + "." + + "Please check previous log entries"); + } + // get initial partitions list. The order of the partitions is important for consistent // partition id assignment in restart cases. this.partitions = new int[partitionInfos.size()]; @@ -424,7 +429,11 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T> } finally { if (offsetCommitter != null) { offsetCommitter.close(); - offsetCommitter.join(); + try { + offsetCommitter.join(); + } catch(InterruptedException ie) { + // ignore interrupt + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 48f4c50..2116c01 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -123,6 +123,35 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // select which tests to run. // ------------------------------------------------------------------------ + + /** + * Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist + * and a wrong broker was specified + * + * @throws Exception + */ + public void runFailOnNoBrokerTest() throws Exception { + try { + Properties properties = new Properties(); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + see.getConfig().disableSysoutLogging(); + see.setNumberOfExecutionRetries(0); + see.setParallelism(1); + + // use wrong ports for the consumers + properties.setProperty("bootstrap.servers", "localhost:80"); + properties.setProperty("zookeeper.connect", "localhost:80"); + properties.setProperty("group.id", "test"); + FlinkKafkaConsumer<String> source = getConsumer("doesntexist", new SimpleStringSchema(), properties); + DataStream<String> stream = see.addSource(source); + stream.print(); + see.execute("No broker test"); + } catch(RuntimeException re){ + Assert.assertTrue("Wrong RuntimeException thrown", + re.getMessage().contains("Unable to retrieve any partitions for topic")); + } + } /** * Test that validates that checkpointing and checkpoint notification works properly */ http://git-wip-us.apache.org/repos/asf/flink/blob/209ae6c9/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 3ca7c5c..5f2cdbc 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -40,6 +40,11 @@ public class KafkaITCase extends KafkaConsumerTestBase { runCheckpointingTest(); } + @Test() + public void testFailOnNoBroker() throws Exception { + runFailOnNoBrokerTest(); + } + @Test public void testOffsetInZookeeper() throws Exception { runOffsetInZookeeperValidationTest();