[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/589#issuecomment-92834090 I would like to merge this PR rather soon because I need some of the changes from this PR for adding another KafkaITCase. Is it okay to postpone the refactor to the `TestStreamEnvironment` in a separate PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/589#issuecomment-92846151 Yes, of course, we can postpone the refactor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/589 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/589#issuecomment-92902901 I'm merging the PR now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28257271 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java --- @@ -524,13 +537,149 @@ public void cancel() { } } + private static boolean leaderHasShutDown = false; + + @Test + public void brokerFailureTest() throws Exception { + String topic = brokerFailureTestTopic; + + createTestTopic(topic, 2, 2); - private void createTestTopic(String topic, int numberOfPartitions) { KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1); + final String leaderToShutDown = + kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString(); + + final Thread brokerShutdown = new Thread(new Runnable() { + @Override + public void run() { + shutdownKafkaBroker = false; + while (!shutdownKafkaBroker) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.warn(Interruption, e); + } + } + + for (KafkaServer kafkaServer : brokers) { + if (leaderToShutDown.equals( + kafkaServer.config().advertisedHostName() + + : + + kafkaServer.config().advertisedPort() + )) { + LOG.info(Killing Kafka Server {}, leaderToShutDown); + kafkaServer.shutdown(); + leaderHasShutDown = true; + break; + } + } + } + }); + brokerShutdown.start(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); + --- End diff -- You can add classes from other module's test/ directory as a maven dependency with this mechanism: https://maven.apache.org/plugins/maven-jar-plugin/examples/create-test-jar.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227428 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.curator.test.TestingServer; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils; +import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.api.PartitionMetadata; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; + +public class KafkaTopicUtilsTest { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtilsTest.class); + private static final int NUMBER_OF_BROKERS = 2; + private static final String TOPIC = myTopic; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void test() { + int zkPort; + String kafkaHost; + String zookeeperConnectionString; + + File tmpZkDir; + ListFile tmpKafkaDirs; + MapString, KafkaServer kafkaServers = null; + TestingServer zookeeper = null; + + try { + tmpZkDir = tempFolder.newFolder(); + + tmpKafkaDirs = new ArrayListFile(NUMBER_OF_BROKERS); + for (int i = 0; i NUMBER_OF_BROKERS; i++) { + tmpKafkaDirs.add(tempFolder.newFolder()); + } + + zkPort = NetUtils.getAvailablePort(); + kafkaHost = InetAddress.getLocalHost().getHostName(); + zookeeperConnectionString = localhost: + zkPort; + + // init zookeeper + zookeeper = new TestingServer(zkPort, tmpZkDir); + + // init kafka kafkaServers + kafkaServers = new HashMapString, KafkaServer(); + + for (int i = 0; i NUMBER_OF_BROKERS; i++) { + KafkaServer kafkaServer = getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i)); + kafkaServers.put(kafkaServer.config().advertisedHostName() + : + kafkaServer.config().advertisedPort(), kafkaServer); + } + + // create Kafka topic + final KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); + kafkaTopicUtils.createTopic(TOPIC, 1, 2); + + // check whether topic exists + assertTrue(kafkaTopicUtils.topicExists(TOPIC)); + + // check number of partitions + assertEquals(1, kafkaTopicUtils.getNumberOfPartitions(TOPIC)); + + // get partition metadata without error + PartitionMetadata partitionMetadata = kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0); + assertEquals(0, partitionMetadata.errorCode()); + + // get broker list +
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227273 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java --- @@ -111,18 +128,38 @@ public void initialize() throws InterruptedException { } while (metadata == null); if (metadata.leader() == null) { - throw new RuntimeException(Can't find Leader for Topic and Partition. (at + hosts.get(0) - + : + port); + throw new RuntimeException(Can't find Leader for Topic and Partition. (at + hosts + )); } - leadBroker = metadata.leader().host(); + leadBroker = metadata.leader(); clientName = Client_ + topic + _ + partition; - consumer = new SimpleConsumer(leadBroker, port, connectTimeoutMs, bufferSize, clientName); + consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), connectTimeoutMs, bufferSize, clientName); + + try { + readOffset = initialOffset.getOffset(consumer, topic, partition, clientName); + } catch (NotLeaderForPartitionException e) { + do { + + metadata = findLeader(hosts, topic, partition); - readOffset = initialOffset.getOffset(consumer, topic, partition, clientName); + try { + Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH); + } catch (InterruptedException ie) { + throw new InterruptedException(Establishing connection to Kafka failed); + } + } while (metadata == null); --- End diff -- This loop also might run forever --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227590 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java --- @@ -524,13 +537,149 @@ public void cancel() { } } + private static boolean leaderHasShutDown = false; + + @Test + public void brokerFailureTest() throws Exception { + String topic = brokerFailureTestTopic; + + createTestTopic(topic, 2, 2); - private void createTestTopic(String topic, int numberOfPartitions) { KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1); + final String leaderToShutDown = + kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString(); + + final Thread brokerShutdown = new Thread(new Runnable() { + @Override + public void run() { + shutdownKafkaBroker = false; + while (!shutdownKafkaBroker) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.warn(Interruption, e); + } + } + + for (KafkaServer kafkaServer : brokers) { + if (leaderToShutDown.equals( + kafkaServer.config().advertisedHostName() + + : + + kafkaServer.config().advertisedPort() + )) { + LOG.info(Killing Kafka Server {}, leaderToShutDown); + kafkaServer.shutdown(); + leaderHasShutDown = true; + break; + } + } + } + }); + brokerShutdown.start(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); + --- End diff -- I would use `TestSreamEnvironment` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227565 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java --- @@ -114,17 +132,24 @@ public KafkaSink(String zookeeperAddress, String topicId, public void open(Configuration configuration) { KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress); - String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId); + String listOfBrokers = kafkaTopicUtils.getBrokerList(topicId); + + if (LOG.isInfoEnabled()) { + LOG.info(Broker list: {}, listOfBrokers); + } - props = new Properties(); + if (props == null) { --- End diff -- How about always setting our default properties (broker list, max retries etc.) and overwrite them with the properties passed by the users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227177 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java --- @@ -65,36 +75,145 @@ public void createTopic(String topicName, int numOfPartitions, int replicationFa LOG.warn(Kafka topic \{}\ already exists. Returning without action., topicName); } } else { + LOG.info(Connecting zookeeper); + + initZkClient(); AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig); + closeZkClient(); + } + } + + public String getBrokerList(String topicName) { + return getBrokerAddressList(getBrokerAddresses(topicName)); + } + + public String getBrokerList(String topicName, int partitionId) { + return getBrokerAddressList(getBrokerAddresses(topicName, partitionId)); + } + + public SetString getBrokerAddresses(String topicName) { + int numOfPartitions = getNumberOfPartitions(topicName); + + HashSetString brokers = new HashSetString(); + for (int i = 0; i numOfPartitions; i++) { + brokers.addAll(getBrokerAddresses(topicName, i)); } + return brokers; + } + + public SetString getBrokerAddresses(String topicName, int partitionId) { + PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId); + CollectionBroker inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr()); + + HashSetString addresses = new HashSetString(); + for (Broker broker : inSyncReplicas) { + addresses.add(broker.connectionString()); + } + return addresses; + } + + private static String getBrokerAddressList(SetString brokerAddresses) { + StringBuilder brokerAddressList = new StringBuilder(); + for (String broker : brokerAddresses) { + brokerAddressList.append(broker); + brokerAddressList.append(','); + } + brokerAddressList.deleteCharAt(brokerAddressList.length() - 1); + + return brokerAddressList.toString(); } public int getNumberOfPartitions(String topicName) { - SeqPartitionMetadata partitionMetadataSeq = getTopicInfo(topicName).partitionsMetadata(); + SeqPartitionMetadata partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata(); return JavaConversions.asJavaCollection(partitionMetadataSeq).size(); } - public String getLeaderBrokerAddressForTopic(String topicName) { - TopicMetadata topicInfo = getTopicInfo(topicName); + public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId) { + PartitionMetadata partitionMetadata; + while (true) { + try { + partitionMetadata = getPartitionMetadata(topicName, partitionId); + return partitionMetadata; + } catch (LeaderNotAvailableException e) { + // try fetching metadata again --- End diff -- I would suggest to LOG.debug the exception --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28227162 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java --- @@ -65,36 +75,145 @@ public void createTopic(String topicName, int numOfPartitions, int replicationFa LOG.warn(Kafka topic \{}\ already exists. Returning without action., topicName); } } else { + LOG.info(Connecting zookeeper); + + initZkClient(); AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig); + closeZkClient(); + } + } + + public String getBrokerList(String topicName) { + return getBrokerAddressList(getBrokerAddresses(topicName)); + } + + public String getBrokerList(String topicName, int partitionId) { + return getBrokerAddressList(getBrokerAddresses(topicName, partitionId)); + } + + public SetString getBrokerAddresses(String topicName) { + int numOfPartitions = getNumberOfPartitions(topicName); + + HashSetString brokers = new HashSetString(); + for (int i = 0; i numOfPartitions; i++) { + brokers.addAll(getBrokerAddresses(topicName, i)); } + return brokers; + } + + public SetString getBrokerAddresses(String topicName, int partitionId) { + PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId); + CollectionBroker inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr()); + + HashSetString addresses = new HashSetString(); + for (Broker broker : inSyncReplicas) { + addresses.add(broker.connectionString()); + } + return addresses; + } + + private static String getBrokerAddressList(SetString brokerAddresses) { + StringBuilder brokerAddressList = new StringBuilder(); + for (String broker : brokerAddresses) { + brokerAddressList.append(broker); + brokerAddressList.append(','); + } + brokerAddressList.deleteCharAt(brokerAddressList.length() - 1); + + return brokerAddressList.toString(); } public int getNumberOfPartitions(String topicName) { - SeqPartitionMetadata partitionMetadataSeq = getTopicInfo(topicName).partitionsMetadata(); + SeqPartitionMetadata partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata(); return JavaConversions.asJavaCollection(partitionMetadataSeq).size(); } - public String getLeaderBrokerAddressForTopic(String topicName) { - TopicMetadata topicInfo = getTopicInfo(topicName); + public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId) { + PartitionMetadata partitionMetadata; + while (true) { + try { + partitionMetadata = getPartitionMetadata(topicName, partitionId); + return partitionMetadata; + } catch (LeaderNotAvailableException e) { + // try fetching metadata again + } --- End diff -- We might end up in an infinitive loop here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/589#issuecomment-92413840 Thanks for the comments, I made the proposed changes (except Marton's). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...
Github user gaborhermann commented on a diff in the pull request: https://github.com/apache/flink/pull/589#discussion_r28251514 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java --- @@ -524,13 +537,149 @@ public void cancel() { } } + private static boolean leaderHasShutDown = false; + + @Test + public void brokerFailureTest() throws Exception { + String topic = brokerFailureTestTopic; + + createTestTopic(topic, 2, 2); - private void createTestTopic(String topic, int numberOfPartitions) { KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1); + final String leaderToShutDown = + kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString(); + + final Thread brokerShutdown = new Thread(new Runnable() { + @Override + public void run() { + shutdownKafkaBroker = false; + while (!shutdownKafkaBroker) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.warn(Interruption, e); + } + } + + for (KafkaServer kafkaServer : brokers) { + if (leaderToShutDown.equals( + kafkaServer.config().advertisedHostName() + + : + + kafkaServer.config().advertisedPort() + )) { + LOG.info(Killing Kafka Server {}, leaderToShutDown); + kafkaServer.shutdown(); + leaderHasShutDown = true; + break; + } + } + } + }); + brokerShutdown.start(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); + --- End diff -- I could not reach TestStreamEnvironment from flink-streaming-connectors. Should the TestStreamEnvironment be migrated from the test directory to the main? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---