Cody et. al, I am seeing a similar error. I've increased the number of retries. Once I've got a job up and running I'm seeing it retry correctly. However, I am having trouble getting the job started - number of retries does not seem to help with startup behavior.
Thoughts? Regards, Bryan Jeffrey On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeninger <c...@koeninger.org> wrote: > That's a networking error when the driver is attempting to contact > leaders to get the latest available offsets. > > If it's a transient error, you can look at increasing the value of > spark.streaming.kafka.maxRetries, see > > http://spark.apache.org/docs/latest/configuration.html > > If it's not a transient error, you need to look at your brokers + your > network environment. > > On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti > <surendra.manchika...@gmail.com> wrote: > > Hi, > > > > Can you check Kafka topic replication ? And leader information? > > > > Regards, > > Surendra M > > > > > > > > -- Surendra Manchikanti > > > > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss <ascot.m...@gmail.com> > wrote: > >> > >> Hi, > >> > >> I have a SparkStream (with Kafka) job, after running several days, it > >> failed with following errors: > >> ERROR DirectKafkaInputDStream: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> Any idea what would be wrong? will it be SparkStreaming buffer overflow > >> issue? > >> > >> > >> > >> Regards > >> > >> > >> > >> > >> > >> > >> *** from the log *** > >> > >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect > is > >> overridden to > >> > >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error: > >> java.nio.channels.ClosedChannelException > >> > >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time > >> 1458188031800 ms > >> > >> org.apache.spark.SparkException: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> at > >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > >> > >> at > >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > >> > >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > >> > >> at scala.Option.orElse(Option.scala:257) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > >> > >> at > >> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > >> > >> at > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > >> > >> at > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > >> > >> at > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >> > >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >> > >> at > >> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > >> > >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) > >> > >> at scala.util.Try$.apply(Try.scala:161) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) > >> > >> at > >> org.apache.spark.streaming.scheduler.JobGenerator.org > $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) > >> > >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > >> > >> Exception in thread "main" org.apache.spark.SparkException: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> at > >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > >> > >> at > >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > >> > >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > >> > >> at scala.Option.orElse(Option.scala:257) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > >> > >> at > >> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > >> > >> at > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > >> > >> at > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > >> > >> at > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >> > >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >> > >> at > >> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > >> > >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) > >> > >> at scala.util.Try$.apply(Try.scala:161) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) > >> > >> at > >> org.apache.spark.streaming.scheduler.JobGenerator.org > $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) > >> > >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > >> > >> 16/03/17 12:13:52 INFO StreamingContext: Invoking > >> stop(stopGracefully=false) from shutdown hook > >> > >> 16/03/17 12:13:52 INFO JobGenerator: Stopping JobGenerator immediately > >> > >> 16/03/17 12:13:52 INFO RecurringTimer: Stopped timer for JobGenerator > >> after time 1458188032000 > >> > >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error: > >> java.nio.channels.ClosedChannelException > >> > >> 16/03/17 12:13:52 ERROR DirectKafkaInputDStream: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error: > >> java.nio.channels.ClosedChannelException > >> > >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time > >> 1458188031900 ms > >> > >> org.apache.spark.SparkException: > >> ArrayBuffer(java.nio.channels.ClosedChannelException, > >> org.apache.spark.SparkException: Couldn't find leader offsets for > >> Set([Allergy,0])) > >> > >> at > >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > >> > >> at > >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > >> > >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > >> > >> at scala.Option.orElse(Option.scala:257) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > >> > >> at > >> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > >> > >> at > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > >> > >> at > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > >> > >> at > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >> > >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >> > >> at > >> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > >> > >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) > >> > >> at scala.util.Try$.apply(Try.scala:161) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) > >> > >> at > >> org.apache.spark.streaming.scheduler.JobGenerator.org > $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) > >> > >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > >> > >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error: > >> java.nio.channels.ClosedChannelException > >> > >> 16/03/17 12:13:52 ERROR DirectKafkaInputDStream: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> 16/03/17 12:13:52 INFO JobGenerator: Stopped JobGenerator > >> > >> 16/03/17 12:13:52 INFO JobScheduler: Stopped JobScheduler > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/streaming,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/streaming/batch,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/static/streaming,null} > >> > >> 16/03/17 12:13:52 INFO StreamingContext: StreamingContext stopped > >> successfully > >> > >> 16/03/17 12:13:52 INFO SparkContext: Invoking stop() from shutdown hook > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/streaming/batch/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/streaming/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/metrics/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/stages/stage/kill,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/api,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/static,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/executors/threadDump,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/executors/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/executors,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/environment/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/environment,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/storage/rdd/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/storage/rdd,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/storage/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/storage,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/stages/pool/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/stages/pool,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/stages/stage/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/stages/stage,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/stages/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/stages,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/jobs/job/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/jobs/job,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/jobs/json,null} > >> > >> 16/03/17 12:13:52 INFO ContextHandler: stopped > >> o.s.j.s.ServletContextHandler{/jobs,null} > >> > >> 16/03/17 12:13:52 INFO SparkUI: Stopped Spark web UI at > >> http://192.168.10.31:4042 > >> > >> 16/03/17 12:13:52 INFO DAGScheduler: Stopping DAGScheduler > >> > >> 16/03/17 12:13:52 INFO MapOutputTrackerMasterEndpoint: > >> MapOutputTrackerMasterEndpoint stopped! > >> > >> 16/03/17 12:13:52 INFO MemoryStore: MemoryStore cleared > >> > >> 16/03/17 12:13:52 INFO BlockManager: BlockManager stopped > >> > >> 16/03/17 12:13:52 INFO BlockManagerMaster: BlockManagerMaster stopped > >> > >> 16/03/17 12:13:52 INFO > >> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: > >> OutputCommitCoordinator stopped! > >> > >> 16/03/17 12:13:52 INFO RemoteActorRefProvider$RemotingTerminator: > Shutting > >> down remote daemon. > >> > >> 16/03/17 12:13:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote > >> daemon shut down; proceeding with flushing remote transports. > >> > >> 16/03/17 12:13:52 INFO RemoteActorRefProvider$RemotingTerminator: > Remoting > >> shut down. > >> > >> 16/03/17 12:13:52 INFO SparkContext: Successfully stopped SparkContext > >> > >> 16/03/17 12:13:52 INFO ShutdownHookManager: Shutdown hook called > >> > >> 16/03/17 12:13:52 INFO ShutdownHookManager: Deleting directory > >> /tmp/spark-c6376f77-9f5a-4f76-bbaf-6fa8eb37870a > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >