Sounds like the issue is with Kafka channel, it is closing. Reconnect due to socket error: java.nio.channels.ClosedChannelException
Can you relax that val ssc = new StreamingContext(sparkConf, Seconds(20) Also how are you getting your source data? You can actually have both Spark and the output below at the same time running tol see the exact cause of it ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 --from-beginning --topic newtopic Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 7 June 2016 at 11:32, Dominik Safaric <dominiksafa...@gmail.com> wrote: > Unfortunately, even with this Spark configuration and Kafka parameters, > the same exception keeps occurring: > > 16/06/07 12:26:11 INFO SimpleConsumer: Reconnect due to socket error: > java.nio.channels.ClosedChannelException > org.apache.spark.SparkException: java.nio.channels.ClosedChannelException > org.apache.spark.SparkException: Couldn't find leader offsets for > Set([<topicname>,0]) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) > at scala.util.Either.fold(Either.scala:97) > at > org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) > at > org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) > > If it helps for troubleshooting, here are the logs of the Kafka server: > > 16-06-07 10:24:58,349] INFO Initiating client connection, > connectString=localhost:2181 sessionTimeout=6000 > watcher=org.I0Itec.zkclient.ZkClient@4e05faa7 > (org.apache.zookeeper.ZooKeeper) > [2016-06-07 10:24:58,365] INFO Opening socket connection to server > localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL > (unknown error) (org.apache.zookeeper.ClientCnxn) > [2016-06-07 10:24:58,365] INFO Waiting for keeper state SyncConnected > (org.I0Itec.zkclient.ZkClient) > [2016-06-07 10:24:58,375] INFO Socket connection established to localhost/ > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) > [2016-06-07 10:24:58,405] INFO Session establishment complete on server > localhost/127.0.0.1:2181, sessionid = 0x1552a64a9a80000, negotiated > timeout = 6000 (org.apache.zookeeper.ClientCnxn) > [2016-06-07 10:24:58,408] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2016-06-07 10:24:58,562] INFO Loading logs. (kafka.log.LogManager) > [2016-06-07 10:24:58,608] INFO Completed load of log <topic_name>-0 with > log end offset 15 (kafka.log.Log) > [2016-06-07 10:24:58,614] INFO Completed load of log _schemas-0 with log > end offset 1 (kafka.log.Log) > [2016-06-07 10:24:58,617] INFO Completed load of log <topic_name>-0 with > log end offset 5 (kafka.log.Log) > [2016-06-07 10:24:58,620] INFO Completed load of log <topic_name>-0 with > log end offset 2 (kafka.log.Log) > [2016-06-07 10:24:58,629] INFO Completed load of log <topic_name>-0 with > log end offset 1759 (kafka.log.Log) > [2016-06-07 10:24:58,635] INFO Logs loading complete. > (kafka.log.LogManager) > [2016-06-07 10:24:58,737] INFO Starting log cleanup with a period of > 300000 ms. (kafka.log.LogManager) > [2016-06-07 10:24:58,739] INFO Starting log flusher with a default period > of 9223372036854775807 ms. (kafka.log.LogManager) > [2016-06-07 10:24:58,798] INFO Awaiting socket connections on 0.0.0.0:9092. > (kafka.network.Acceptor) > [2016-06-07 10:24:58,809] INFO [Socket Server on Broker 1], Started 1 > acceptor threads (kafka.network.SocketServer) > [2016-06-07 10:24:58,849] INFO [ExpirationReaper-1], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2016-06-07 10:24:58,850] INFO [ExpirationReaper-1], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2016-06-07 10:24:58,953] INFO Creating /controller (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > [2016-06-07 10:24:58,973] INFO Result of znode creation is: OK > (kafka.utils.ZKCheckedEphemeral) > [2016-06-07 10:24:58,974] INFO 1 successfully elected as leader > (kafka.server.ZookeeperLeaderElector) > [2016-06-07 10:24:59,180] INFO [GroupCoordinator 1]: Starting up. > (kafka.coordinator.GroupCoordinator) > [2016-06-07 10:24:59,191] INFO [ExpirationReaper-1], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2016-06-07 10:24:59,194] INFO New leader is 1 > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) > [2016-06-07 10:24:59,198] INFO [Group Metadata Manager on Broker 1]: > Removed 0 expired offsets in 16 milliseconds. > (kafka.coordinator.GroupMetadataManager) > [2016-06-07 10:24:59,195] INFO [ExpirationReaper-1], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2016-06-07 10:24:59,195] INFO [GroupCoordinator 1]: Startup complete. > (kafka.coordinator.GroupCoordinator) > [2016-06-07 10:24:59,215] INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > [2016-06-07 10:24:59,217] INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > [2016-06-07 10:24:59,220] INFO Will not load MX4J, mx4j-tools.jar is not > in the classpath (kafka.utils.Mx4jLoader$) > [2016-06-07 10:24:59,230] INFO Creating /brokers/ids/1 (is it secure? > false) (kafka.utils.ZKCheckedEphemeral) > [2016-06-07 10:24:59,244] INFO Result of znode creation is: OK > (kafka.utils.ZKCheckedEphemeral) > [2016-06-07 10:24:59,245] INFO Registered broker 1 at path /brokers/ids/1 > with addresses: PLAINTEXT -> EndPoint(<public_DNS>,9092,PLAINTEXT) > (kafka.utils.ZkUtils) > [2016-06-07 10:24:59,257] INFO Kafka version : 0.9.0.1 > (org.apache.kafka.common.utils.AppInfoParser) > [2016-06-07 10:24:59,257] INFO Kafka commitId : 23c69d62a0cabf06 > (org.apache.kafka.common.utils.AppInfoParser) > [2016-06-07 10:24:59,258] INFO [Kafka Server 1], started > (kafka.server.KafkaServer) > [2016-06-07 10:24:59,648] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions [<topic_name>,0] (kafka.server.ReplicaFetcherManager) > [2016-06-07 10:24:59,682] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions [<topic_name>,0] (kafka.server.ReplicaFetcherManager) > > Whereas Zookeeper produced the following logs: > > [2016-06-07 10:24:47,935] INFO Server > environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,935] INFO Server environment:java.io.tmpdir=/tmp > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,935] INFO Server environment:java.compiler=<NA> > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,936] INFO Server environment:os.name=Linux > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,936] INFO Server environment:os.arch=amd64 > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,939] INFO Server > environment:os.version=4.4.11-23.53.amzn1.x86_64 > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,939] INFO Server environment:user.name=ec2-user > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,939] INFO Server environment:user.home=/home/ec2-user > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,939] INFO Server > environment:user.dir=/home/ec2-user/kafka_2.11-0.9.0.1 > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,946] INFO tickTime set to 3000 > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,946] INFO minSessionTimeout set to -1 > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,946] INFO maxSessionTimeout set to -1 > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:47,955] INFO binding to port 0.0.0.0/0.0.0.0:2181 > (org.apache.zookeeper.server.NIOServerCnxnFactory) > [2016-06-07 10:24:58,370] INFO Accepted socket connection from / > 127.0.0.1:41368 (org.apache.zookeeper.server.NIOServerCnxnFactory) > [2016-06-07 10:24:58,384] INFO Client attempting to establish new session > at /127.0.0.1:41368 (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:58,389] INFO Creating new log file: log.3eb > (org.apache.zookeeper.server.persistence.FileTxnLog) > [2016-06-07 10:24:58,400] INFO Established session 0x1552a64a9a80000 with > negotiated timeout 6000 for client /127.0.0.1:41368 > (org.apache.zookeeper.server.ZooKeeperServer) > [2016-06-07 10:24:59,154] INFO Got user-level KeeperException when > processing sessionid:0x1552a64a9a80000 type:delete cxid:0x26 zxid:0x3ee > txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election > Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election > (org.apache.zookeeper.server.PrepRequestProcessor) > [2016-06-07 10:24:59,231] INFO Got user-level KeeperException when > processing sessionid:0x1552a64a9a80000 type:create cxid:0x2d zxid:0x3ef > txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = > NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor) > [2016-06-07 10:24:59,232] INFO Got user-level KeeperException when > processing sessionid:0x1552a64a9a80000 type:create cxid:0x2e zxid:0x3f0 > txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = > NodeExists for /brokers/ids > (org.apache.zookeeper.server.PrepRequestProcessor) > > Interestedly, I am able to both retrieve the messages from the specified > topic using the console Consumer and produce messages using the REST API. > > As for Kafka/Zookeeper accessibility, since this is a proof-of-concept, > all connections to the ports have been allowed. > > On 07 Jun 2016, at 12:14, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > > ok that is good > > Yours is basically simple streaming with Kafka (publishing topic) and your > Spark streaming. use the following as blueprint > > // Create a local StreamingContext with two working thread and batch > interval of 2 seconds. > val sparkConf = new SparkConf(). > setAppName("CEP_streaming"). > setMaster("local[2]"). > set("spark.executor.memory", "4G"). > set("spark.cores.max", "2"). > set("spark.streaming.concurrentJobs", "2"). > set("spark.driver.allowMultipleContexts", "true"). > set("spark.hadoop.validateOutputSpecs", "false") > val ssc = new StreamingContext(sparkConf, Seconds(2)) > ssc.checkpoint("checkpoint") > val kafkaParams = Map[String, String]("bootstrap.servers" -> > "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081", > "zookeeper.connect" -> "rhes564:2181", "group.id" -> "CEP_streaming" ) > val topics = Set("newtopic") > val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topics) > dstream.cache() > > val lines = dstream.map(_._2) > val price = lines.map(_.split(',').view(2)).map(_.toFloat) > // window length - The duration of the window below that must be multiple > of batch interval n in = > StreamingContext(sparkConf, Seconds(n)) > val windowLength = 4 > // sliding interval - The interval at which the window operation is > performed in other words data is collected within this "previous interval' > val slidingInterval = 2 // keep this the same as batch window for > continuous streaming. You are aggregating data that you are collecting over > the batch Window > val countByValueAndWindow = price.filter(_ > > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) > countByValueAndWindow.print() > // > ssc.start() > ssc.awaitTermination() > > HTH > > Dr Mich Talebzadeh > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > http://talebzadehmich.wordpress.com > > > > On 7 June 2016 at 10:58, Dominik Safaric <dominiksafa...@gmail.com> wrote: > >> Dear Mich, >> >> Thank you for the reply. >> >> By running the following command in the command line: >> >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic >> <topic_name> --from-beginning >> >> I do indeed retrieve all messages of a topic. >> >> Any indication onto what might cause the issue? >> >> An important note to make, I’m using the default configuration of both >> Kafka and Zookeeper. >> >> On 07 Jun 2016, at 11:39, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >> I assume you zookeeper is up and running >> >> can you confirm that you are getting topics from kafka independently for >> example on the command line >> >> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 >> --from-beginning --topic newtopic >> >> >> >> >> >> Dr Mich Talebzadeh >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 7 June 2016 at 10:06, Dominik Safaric <dominiksafa...@gmail.com> >> wrote: >> >>> As I am trying to integrate Kafka into Spark, the following exception >>> occurs: >>> >>> org.apache.spark.SparkException: java.nio.channels.ClosedChannelException >>> org.apache.spark.SparkException: Couldn't find leader offsets for >>> Set([*<topicName>*,0]) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) >>> at scala.util.Either.fold(Either.scala:97) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) >>> at >>> >>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) >>> at org.mediasoft.spark.Driver$.main(Driver.scala:42) >>> at .<init>(<console>:11) >>> at .<clinit>(<console>) >>> at .<init>(<console>:7) >>> at .<clinit>(<console>) >>> at $print(<console>) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:483) >>> at >>> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) >>> at >>> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) >>> at >>> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) >>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) >>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) >>> at >>> scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) >>> at >>> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) >>> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) >>> at >>> scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) >>> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) >>> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) >>> at >>> >>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) >>> at >>> >>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) >>> at >>> >>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) >>> at >>> >>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) >>> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) >>> at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904) >>> at >>> >>> org.jetbrains.plugins.scala.compiler.rt.ConsoleRunner.main(ConsoleRunner.java:64) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:483) >>> at >>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >>> >>> As for the Spark configuration: >>> >>> val conf: SparkConf = new >>> SparkConf().setAppName("AppName").setMaster("local[2]") >>> >>> val confParams: Map[String, String] = Map( >>> "metadata.broker.list" -> "<IP_ADDRESS>:9092", >>> "auto.offset.reset" -> "largest" >>> ) >>> >>> val topics: Set[String] = Set("<topic_name>") >>> >>> val context: StreamingContext = new StreamingContext(conf, >>> Seconds(1)) >>> val kafkaStream = KafkaUtils.createDirectStream(context,confParams, >>> topics) >>> >>> kafkaStream.foreachRDD(rdd => { >>> rdd.collect().foreach(println) >>> }) >>> >>> context.awaitTermination() >>> context.start() >>> >>> The Kafka topic does exist, Kafka server is up and running and I am able >>> to >>> produce messages to that particular topic using the Confluent REST API. >>> >>> What might the problem actually be? >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Kafka-Integration-org-apache-spark-SparkException-Couldn-t-find-leader-offsets-for-Set-tp27103.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com >>> <http://nabble.com/>. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >> > >