My bad Got that exception in driver code of same job not in executor. But it says of socket close exception only.
org.apache.spark.SparkException: ArrayBuffer(java.io.EOFException: Received -1 when reading from channel, socket has likely been closed., org.apache.spark.SparkException: Couldn't find leader offsets for Set([topicname,51], [topicname,201], [topicname,54], [topicname,93], [topicname,297], [topicname,123], [topicname,147], [topicname,126], [topicname,189], [topicname,111], [topicname,159], [topicname,33], [topicname,36], [topicname,60], [topicname,216], [topicname,9], [topicname,12], [topicname,282], [topicname,39], [topicname,63], [topicname,231], [topicname,279], [topicname,18], [topicname,30], [topicname,276], [topicname,228], [topicname,84], [topicname,252], [topicname,48], [topicname,150], [topicname,132], [topicname,57], [topicname,72], [topicname,291], [topicname,234], [topicname,204], [topicname,186], [topicname,264], [topicname,288], [topicname,87], [topicname,78], [topicname,249], [topicname,102], [topicname,108], [topicname,237], [topicname,24], [topicname,96], [topicname,135], [topicname,198], [topicname,162], [topicname,42], [topicname,258], [topicname,0], [topicname,174], [topicname,207], [topicname,210], [topicname,246], [topicname,225], [topicname,270], [topicname,156], [topicname,183], [topicname,144], [topicname,117], [topicname,69], [topicname,45], [topicname,219], [topicname,177], [topicname,105], [topicname,171], [topicname,141], [topicname,285], [topicname,27], [topicname,168], [topicname,267], [topicname,213], [topicname,153], [topicname,138], [topicname,255], [topicname,222], [topicname,243], [topicname,261], [topicname,90], [topicname,114], [topicname,3], [topicname,81], [topicname,180], [topicname,21], [topicname,6], [topicname,195], [topicname,129], [topicname,192], [topicname,99], [topicname,294], [topicname,165], [topicname,240], [topicname,66], [topicname,75], [topicname,15], [topicname,273], [topicname,120])) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241) at org.apache.spark.streaming.scheduler.JobGenerator.org $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/09/10 02:36:02 ERROR yarn.ApplicationMaster: User class threw exception: ArrayBuffer(java.io.EOFException: Received -1 when reading from channel, socket has likely been closed., org.apache.spark.SparkException: Couldn't find leader offsets for Set([topicname,51], [topicname,201], [topicname,54], [topicname,93], [topicname,297], [topicname,123], [topicname,147], [topicname,126], [topicname,189], [topicname,111], [topicname,159], [topicname,33], [topicname,36], [topicname,60], [topicname,216], [topicname,9], [topicname,12], [topicname,282], [topicname,39], [topicname,63], [topicname,231], [topicname,279], [topicname,18], [topicname,30], [topicname,276], [topicname,228], [topicname,84], [topicname,252], [topicname,48], [topicname,150], [topicname,132], [topicname,57], [topicname,72], [topicname,291], [topicname,234], [topicname,204], [topicname,186], [topicname,264], [topicname,288], [topicname,87], [topicname,78], [topicname,249], [topicname,102], [topicname,108], [topicname,237], [topicname,24], [topicname,96], [topicname,135], [topicname,198], [topicname,162], [topicname,42], [topicname,258], [topicname,0], [topicname,174], [topicname,207], [topicname,210], [topicname,246], [topicname,225], [topicname,270], [topicname,156], [topicname,183], [topicname,144], [topicname,117], [topicname,69], [topicname,45], [topicname,219], [topicname,177], [topicname,105], [topicname,171], [topicname,141], [topicname,285], [topicname,27], [topicname,168], [topicname,267], [topicname,213], [topicname,153], [topicname,138], [topicname,255], [topicname,222], [topicname,243], [topicname,261], [topicname,90], [topicname,114], [topicname,3], [topicname,81], [topicname,180], [topicname,21], [topicname,6], [topicname,195], [topicname,129], [topicname,192], [topicname,99], [topicname,294], [topicname,165], [topicname,240], [topicname,66], [topicname,75], [topicname,15], [topicname,273], [topicname,120])) On Thu, Sep 10, 2015 at 8:18 PM, Cody Koeninger <c...@koeninger.org> wrote: > NotLeaderForPartitionException means you lost a kafka broker or had a > rebalance... why did you say " I am getting Connection tmeout in my code." > > You've asked questions about this exact same situation before, the answer > remains the same > > On Thu, Sep 10, 2015 at 9:44 AM, Shushant Arora <shushantaror...@gmail.com > > wrote: > >> Stack trace is >> 15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname >> partition 99, sleeping for 200ms >> kafka.common.NotLeaderForPartitionException >> at sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown >> Source) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:526) >> at java.lang.Class.newInstance(Class.java:374) >> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142) >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151) >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) >> at >> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >> at >> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) >> at >> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147) >> at >> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35) >> at >> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) >> at >> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) >> at >> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) >> at >> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> 15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while >> consuming messages from kafka >> >> >> >> >> Actual code is : >> >> In driver : >> final KafkaStreamTransformations transformations = new >> KafkaStreamTransformations >> (...); >> >> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() { >> >> @Override >> public Void call(JavaRDD<byte[][]> v1) throws Exception { >> v1.foreachPartition(transformations); >> return null; >> } >> }); >> -------------------------------------------- >> >> In KafkaStreamTransformations : >> >> >> @Override >> public void call(Iterator<byte[][]> t) throws Exception { >> try{ >> while(t.hasNext()){ >> ...long running task >> } >> }catch(Exception e){ >> e.printStackTrace(); >> logger.error("Error while consuming messages from kafka"); >> } >> >> >> >> >> >> >> On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Post the actual stacktrace you're getting >>> >>> On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Executors in spark streaming 1.3 fetch messages from kafka in batches >>>> and what happens when executor takes longer time to complete a fetch batch >>>> >>>> say in >>>> >>>> >>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() { >>>> >>>> @Override >>>> public Void call(JavaRDD<byte[][]> v1) throws Exception { >>>> v1.foreachPartition(new VoidFunction<Iterator<byte[][]>>{ >>>> @Override >>>> public void call(Iterator<byte[][]> t) throws Exception { >>>> //long running task >>>> }});}}); >>>> >>>> Will this long running task drops the connectio of executor with kafka >>>> brokers- >>>> And how to handle that. I am getting Connection tmeout in my code. >>>> >>>> >>>> >>>> >>> >> >