Re: Spark Streaming - Failed to find leader
I'm running kafka_2.10-0.8.2.1, and loading kafka_2.10-0.8.2.1.jar via --jars argument to spark-submit sbt: libraryDependencies += org.apache.spark %% spark-core % 1.2.2 libraryDependencies += org.apache.spark %% spark-streaming % 1.2.2 libraryDependencies += org.apache.spark %% spark-streaming-kafka % 1.2.2 spark-submit --jars kafka_2.10-0.8.2.1.jar,metrics-core-2.2.0.jar,spark-streaming-kafka_2.10-1.2.2.jar,zkclient-0.1.0.ja On Sat, May 9, 2015 at 10:39 AM Will Briggs wrbri...@gmail.com wrote: There's something wrong with your classpath - are you using Maven to handle dependencies? Make sure you are bringing in the correct Kafka client library. See here: http://stackoverflow.com/questions/28353316/kafka-utils-wrong-classpath-org-apache-kafka-common-utils-utils On May 9, 2015, at 1:15 PM, Marty B sp...@mjhb.com wrote: My simple Spark streaming app is failing: ConsumerFetcherManager$LeaderFinderThread: [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to find leader for Set([test-topic,0]) java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils I created the topic 'test-topic' with 1 replication-factor 1 and 1 partition, and validated message delivery with the kafka-console-producer and consumer utilities. Running default configurations for Kafka and ZooKeeper, with everything (including Spark) running locally on my linux laptop. This warning repeats continuously, and the app consumes no messages: 15/05/09 09:37:00 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1431189419735] Added fetcher for partitions ArrayBuffer() 15/05/09 09:37:00 WARN ConsumerFetcherManager$LeaderFinderThread: [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to find leader for Set([test-topic,0]) java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils at kafka.cluster.Broker.connectionString(Broker.scala:62) at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Spark streaming experiment app: import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object TestApp { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Test App) val ssc = new StreamingContext(sparkConf, Seconds(5)) val group = test-group val topicMap = Map(test-topic - 1) val zkQuorum = localhost:2181 val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) messages.foreachRDD { rdd = println(RDD, rdd.count) } ssc.start() ssc.awaitTermination() } } Is there anything that I should be doing differently to avoid this error?
Spark Streaming - Failed to find leader
My simple Spark streaming app is failing: ConsumerFetcherManager$LeaderFinderThread: [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to find leader for Set([test-topic,0]) java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils I created the topic 'test-topic' with 1 replication-factor 1 and 1 partition, and validated message delivery with the kafka-console-producer and consumer utilities. Running default configurations for Kafka and ZooKeeper, with everything (including Spark) running locally on my linux laptop. This warning repeats continuously, and the app consumes no messages: 15/05/09 09:37:00 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1431189419735] Added fetcher for partitions ArrayBuffer() 15/05/09 09:37:00 WARN ConsumerFetcherManager$LeaderFinderThread: [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to find leader for Set([test-topic,0]) java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils at kafka.cluster.Broker.connectionString(Broker.scala:62) at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Spark streaming experiment app: import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object TestApp { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Test App) val ssc = new StreamingContext(sparkConf, Seconds(5)) val group = test-group val topicMap = Map(test-topic - 1) val zkQuorum = localhost:2181 val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) messages.foreachRDD { rdd = println(RDD, rdd.count) } ssc.start() ssc.awaitTermination() } } Is there anything that I should be doing differently to avoid this error?
Re: Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer?
Hi James, What are you trying to do exactly? If all you are trying to do is monitor how far behind a consumer is getting you could use the ConsumerOffsetChecker. As described in the link below. http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer Each message being processed will also have the offset and partition attached to it so with that data. I suppose that information plus info from a fetch response you could determine this with in an application. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse Does that help? On Fri, May 8, 2015 at 6:04 PM, James Cheng jch...@tivo.com wrote: Hi, I want to use the high level consumer to read all partitions for a topic, and know when I have reached the end. I know the end might be a little vague, since items keep showing up, but I'm trying to get as close as possible. I know that more messages might show up later, but I want to know when I've received all the items that are currently available in the topic. Is there a standard/recommended way to do this? I know one way to do it is to first issue an OffsetRequest for each partition, which would get me the last offset, and then use that information in my high level consumer to detect when I've reached that a message with that offset. Which is exactly what the SimpleConsumer example does ( https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example). That involves finding the leader for the partition, etc etc. Not hard, but a bunch of steps. I noticed that kafkacat has an option similar to what I'm looking for: -e Exit successfully when last message received Looking at the code, it appears that a FetchRequest returns the HighwaterMarkOffset mark for a partition, and the API docs confirm that: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse Does the Java high-level consumer expose the HighwaterMarkOffset in any way? I looked but I couldn't find such a thing. Thanks, -James
Re: Spark Streaming - Failed to find leader
There's something wrong with your classpath - are you using Maven to handle dependencies? Make sure you are bringing in the correct Kafka client library. See here: http://stackoverflow.com/questions/28353316/kafka-utils-wrong-classpath-org-apache-kafka-common-utils-utils On May 9, 2015, at 1:15 PM, Marty B sp...@mjhb.com wrote: My simple Spark streaming app is failing: ConsumerFetcherManager$LeaderFinderThread: [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to find leader for Set([test-topic,0]) java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils I created the topic 'test-topic' with 1 replication-factor 1 and 1 partition, and validated message delivery with the kafka-console-producer and consumer utilities. Running default configurations for Kafka and ZooKeeper, with everything (including Spark) running locally on my linux laptop. This warning repeats continuously, and the app consumes no messages: 15/05/09 09:37:00 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1431189419735] Added fetcher for partitions ArrayBuffer() 15/05/09 09:37:00 WARN ConsumerFetcherManager$LeaderFinderThread: [test-group_appdev-1431189419702-9d5d7cbc-leader-finder-thread], Failed to find leader for Set([test-topic,0]) java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils at kafka.cluster.Broker.connectionString(Broker.scala:62) at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Spark streaming experiment app: import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ object TestApp { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Test App) val ssc = new StreamingContext(sparkConf, Seconds(5)) val group = test-group val topicMap = Map(test-topic - 1) val zkQuorum = localhost:2181 val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) messages.foreachRDD { rdd = println(RDD, rdd.count) } ssc.start() ssc.awaitTermination() } } Is there anything that I should be doing differently to avoid this error?
Re: Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer?
I've created a patch to expose the high end watermark, having this exact requirement. Still waiting for it to be accepted, but are using this in production at the moment and it works quite nicely: https://issues.apache.org/jira/browse/KAFKA-1977 On Sat, 9 May 2015 at 18:43 Charlie Knudsen charlie.knud...@smartthings.com wrote: Hi James, What are you trying to do exactly? If all you are trying to do is monitor how far behind a consumer is getting you could use the ConsumerOffsetChecker. As described in the link below. http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer Each message being processed will also have the offset and partition attached to it so with that data. I suppose that information plus info from a fetch response you could determine this with in an application. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse Does that help? On Fri, May 8, 2015 at 6:04 PM, James Cheng jch...@tivo.com wrote: Hi, I want to use the high level consumer to read all partitions for a topic, and know when I have reached the end. I know the end might be a little vague, since items keep showing up, but I'm trying to get as close as possible. I know that more messages might show up later, but I want to know when I've received all the items that are currently available in the topic. Is there a standard/recommended way to do this? I know one way to do it is to first issue an OffsetRequest for each partition, which would get me the last offset, and then use that information in my high level consumer to detect when I've reached that a message with that offset. Which is exactly what the SimpleConsumer example does ( https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example ). That involves finding the leader for the partition, etc etc. Not hard, but a bunch of steps. I noticed that kafkacat has an option similar to what I'm looking for: -e Exit successfully when last message received Looking at the code, it appears that a FetchRequest returns the HighwaterMarkOffset mark for a partition, and the API docs confirm that: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse Does the Java high-level consumer expose the HighwaterMarkOffset in any way? I looked but I couldn't find such a thing. Thanks, -James
Re: New Java producer broker metadata update stuck
With the ip - broker id mapping: We've got 3 servers with three ip's and id's 1, 2, 3. The 3 servers kept the ip's but the broker id's switched around so something like: 192.168.0.1 - 1 192.168.0.2 - 2 192.168.0.3 - 3 Then they all stopped and came back as: 192.168.0.1 - 2 192.168.0.2 - 3 192.168.0.3 - 1 For example. This happened as we are using coreos/fleetd to schedule the kafka processes across the cluster, each unit has an id which it keeps but it's not tied to a specific instance. We're working on putting the id as a property of the server, not the unit so that wouldn't happen any more for us. I've not tried the case where the ip - id mapping doesn't change, given the above setup that hard for us to test. Thanks, Dan On 8 May 2015 at 18:13, Mayuresh Gharat gharatmayures...@gmail.com wrote: Also it would be great to know if you see the same issue when you don't have different ip - broker id mapping. Also it would be great if you can explain different ip - broker id mapping mean as Becket said. Thanks, Mayuresh On Fri, May 8, 2015 at 9:48 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: It should do a updateMetadataRequest in case it gets NOT_LEADER_FOR PARTITION. This looks like a bug. Thanks, Mayuresh On Fri, May 8, 2015 at 8:53 AM, Dan danharve...@gmail.com wrote: Hi, We've noticed an issue on our staging environment where all 3 of our Kafka hosts shutdown and came back with a different ip - broker id mapping. I know this is not good and we're fixing that separately. But what we noticed is all the consumers recovered but the producers got stuck with the following exceptions: WARN 2015-05-08 09:19:56,347 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544968 on topic-partition samza-metrics-0, retrying (2145750068 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:56,448 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544970 on topic-partition samza-metrics-0, retrying (2145750067 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:56,549 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544972 on topic-partition samza-metrics-0, retrying (2145750066 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:56,649 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544974 on topic-partition samza-metrics-0, retrying (2145750065 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:56,749 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544976 on topic-partition samza-metrics-0, retrying (2145750064 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:56,850 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544978 on topic-partition samza-metrics-0, retrying (2145750063 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:56,949 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544980 on topic-partition samza-metrics-0, retrying (2145750062 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:57,049 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544982 on topic-partition samza-metrics-0, retrying (2145750061 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:57,150 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544984 on topic-partition samza-metrics-0, retrying (2145750060 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:57,254 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544986 on topic-partition samza-metrics-0, retrying (2145750059 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:57,351 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544988 on topic-partition samza-metrics-0, retrying (2145750058 attempts left). Error: NOT_LEADER_FOR_PARTITION WARN 2015-05-08 09:19:57,454 org.apache.kafka.clients.producer.internals.Sender: Got error produce response with correlation id 3544990 on topic-partition samza-metrics-0, retrying (2145750057 attempts left). Error: NOT_LEADER_FOR_PARTITION So it appears as if the producer did not refresh the metadata once the brokers had come back up. The exceptions carried on for a few hours until we restarted them. We noticed this in both 0.8.2.1 Java clients and via, Kakfa-rest https://github.com/confluentinc/kafka-rest which is