Hi Dominik, Right, and spark 1.6.x uses Kafka v0.8.2.x as I recall. However, it appears as though the v.0.8 consumer is compatible with the Kafka v0.9.x broker, but not the other way around; sorry for the confusion there.
With the direct stream, simple consumer, offsets are tracked by Spark Streaming within its checkpoints by default. You can also manage them yourself if desired. How are you dealing with offsets ? Can you verify the offsets on the broker: kafka-run-class.sh kafka.tools.GetOffsetShell --topic <TOPIC> --broker-list <BROKER-IP:PORT> --time -1 -Todd On Tue, Jun 7, 2016 at 8:17 AM, Dominik Safaric <dominiksafa...@gmail.com> wrote: > libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" > libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" > libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % > "1.6.1" > > Please take a look at the SBT copy. > > I would rather think that the problem is related to the Zookeeper/Kafka > consumers. > > [2016-06-07 11:24:52,484] WARN Either no config or no quorum defined in > config, running in standalone mode > (org.apache.zookeeper.server.quorum.QuorumPeerMain) > > Any indication onto why the channel connection might be closed? Would it > be Kafka or Zookeeper related? > > On 07 Jun 2016, at 14:07, Todd Nist <tsind...@gmail.com> wrote: > > What version of Spark are you using? I do not believe that 1.6.x is > compatible with 0.9.0.1 due to changes in the kafka clients between 0.8.2.2 > and 0.9.0.x. See this for more information: > > https://issues.apache.org/jira/browse/SPARK-12177 > > -Todd > > On Tue, Jun 7, 2016 at 7:35 AM, Dominik Safaric <dominiksafa...@gmail.com> > wrote: > >> Hi, >> >> Correct, I am using the 0.9.0.1 version. >> >> As already described, the topic contains messages. Those messages are >> produced using the Confluence REST API. >> >> However, what I’ve observed is that the problem is not in the Spark >> configuration, but rather Zookeeper or Kafka related. >> >> Take a look at the exception’s stack top item: >> >> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException >> org.apache.spark.SparkException: Couldn't find leader offsets for >> Set([<topicname>,0]) >> at >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) >> at >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) >> at scala.util.Either.fold(Either.scala:97) >> at >> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) >> at >> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) >> at org.mediasoft.spark.Driver$.main(Driver.scala:22) >> at .<init>(<console>:11) >> at .<clinit>(<console>) >> at .<init>(<console>:7) >> >> By listing all active connections using netstat, I’ve also observed that >> both Zookeper and Kafka are running. Zookeeper on port 2181, while Kafka >> 9092. >> >> Furthermore, I am also able to retrieve all log messages using the >> console consumer. >> >> Any clue what might be going wrong? >> >> On 07 Jun 2016, at 13:13, Jacek Laskowski <ja...@japila.pl> wrote: >> >> Hi, >> >> What's the version of Spark? You're using Kafka 0.9.0.1, ain't you? >> What's the topic name? >> >> Jacek >> On 7 Jun 2016 11:06 a.m., "Dominik Safaric" <dominiksafa...@gmail.com> >> wrote: >> >>> As I am trying to integrate Kafka into Spark, the following exception >>> occurs: >>> >>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException >>> org.apache.spark.SparkException: Couldn't find leader offsets for >>> Set([*<topicName>*,0]) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) >>> at scala.util.Either.fold(Either.scala:97) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) >>> at org.mediasoft.spark.Driver$.main(Driver.scala:42) >>> at .<init>(<console>:11) >>> at .<clinit>(<console>) >>> at .<init>(<console>:7) >>> at .<clinit>(<console>) >>> at $print(<console>) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:483) >>> at >>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) >>> at >>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) >>> at >>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) >>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) >>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) >>> at >>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) >>> at >>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) >>> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) >>> at >>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) >>> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) >>> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) >>> at >>> >>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) >>> at >>> >>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) >>> at >>> >>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) >>> at >>> >>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) >>> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) >>> at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904) >>> at >>> >>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:483) >>> at >>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >>> >>> As for the Spark configuration: >>> >>> val conf: SparkConf = new >>> SparkConf().setAppName("AppName").setMaster("local[2]") >>> >>> val confParams: Map[String, String] = Map( >>> "metadata.broker.list" -> "<IP_ADDRESS>:9092", >>> "auto.offset.reset" -> "largest" >>> ) >>> >>> val topics: Set[String] = Set("<topic_name>") >>> >>> val context: StreamingContext = new StreamingContext(conf, >>> Seconds(1)) >>> val kafkaStream = KafkaUtils.createDirectStream(context,confParams, >>> topics) >>> >>> kafkaStream.foreachRDD(rdd => { >>> rdd.collect().foreach(println) >>> }) >>> >>> context.awaitTermination() >>> context.start() >>> >>> The Kafka topic does exist, Kafka server is up and running and I am able >>> to >>> produce messages to that particular topic using the Confluent REST API. >>> >>> What might the problem actually be? >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com >>> <http://nabble.com/>. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> > >