Hi,

Correct, I am using the 0.9.0.1 version. 

As already described, the topic contains messages. Those messages are produced 
using the Confluence REST API.

However, what I’ve observed is that the problem is not in the Spark 
configuration, but rather Zookeeper or Kafka related. 

Take a look at the exception’s stack top item:

org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for 
Set([<topicname>,0])
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at org.mediasoft.spark.Driver$.main(Driver.scala:22)
        at .<init>(<console>:11)
        at .<clinit>(<console>)
        at .<init>(<console>:7)

By listing all active connections using netstat, I’ve also observed that both 
Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka 9092. 

Furthermore, I am also able to retrieve all log messages using the console 
consumer.

Any clue what might be going wrong?

> On 07 Jun 2016, at 13:13, Jacek Laskowski <ja...@japila.pl> wrote:
> 
> Hi,
> 
> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? What's 
> the topic name?
> 
> Jacek
> 
> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <dominiksafa...@gmail.com 
> <mailto:dominiksafa...@gmail.com>> wrote:
> As I am trying to integrate Kafka into Spark, the following exception occurs:
> 
> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([*<topicName>*,0])
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>         at scala.util.Either.fold(Either.scala:97)
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>         at org.mediasoft.spark.Driver$.main(Driver.scala:42)
>         at .<init>(<console>:11)
>         at .<clinit>(<console>)
>         at .<init>(<console>:7)
>         at .<clinit>(<console>)
>         at $print(<console>)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at 
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>         at 
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>         at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>         at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>         at 
> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>         at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>         at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>         at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>         at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>         at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>         at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>         at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>         at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
>         at
> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at 
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> 
> As for the Spark configuration:
> 
>    val conf: SparkConf = new
> SparkConf().setAppName("AppName").setMaster("local[2]")
> 
>     val confParams: Map[String, String] = Map(
>       "metadata.broker.list" -> "<IP_ADDRESS>:9092",
>       "auto.offset.reset" -> "largest"
>     )
> 
>     val topics: Set[String] = Set("<topic_name>")
> 
>     val context: StreamingContext = new StreamingContext(conf, Seconds(1))
>     val kafkaStream = KafkaUtils.createDirectStream(context,confParams,
> topics)
> 
>     kafkaStream.foreachRDD(rdd => {
>       rdd.collect().foreach(println)
>     })
> 
>     context.awaitTermination()
>     context.start()
> 
> The Kafka topic does exist, Kafka server is up and running and I am able to
> produce messages to that particular topic using the Confluent REST API.
> 
> What might the problem actually be?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 

Reply via email to