My bad  Got that exception in driver code of same job not in executor.

But it says of socket close exception only.

org.apache.spark.SparkException: ArrayBuffer(java.io.EOFException: Received
-1 when reading from channel, socket has likely been closed.,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([topicname,51], [topicname,201], [topicname,54], [topicname,93],
[topicname,297], [topicname,123], [topicname,147], [topicname,126],
[topicname,189], [topicname,111], [topicname,159], [topicname,33],
[topicname,36], [topicname,60], [topicname,216], [topicname,9],
[topicname,12], [topicname,282], [topicname,39], [topicname,63],
[topicname,231], [topicname,279], [topicname,18], [topicname,30],
[topicname,276], [topicname,228], [topicname,84], [topicname,252],
[topicname,48], [topicname,150], [topicname,132], [topicname,57],
[topicname,72], [topicname,291], [topicname,234], [topicname,204],
[topicname,186], [topicname,264], [topicname,288], [topicname,87],
[topicname,78], [topicname,249], [topicname,102], [topicname,108],
[topicname,237], [topicname,24], [topicname,96], [topicname,135],
[topicname,198], [topicname,162], [topicname,42], [topicname,258],
[topicname,0], [topicname,174], [topicname,207], [topicname,210],
[topicname,246], [topicname,225], [topicname,270], [topicname,156],
[topicname,183], [topicname,144], [topicname,117], [topicname,69],
[topicname,45], [topicname,219], [topicname,177], [topicname,105],
[topicname,171], [topicname,141], [topicname,285], [topicname,27],
[topicname,168], [topicname,267], [topicname,213], [topicname,153],
[topicname,138], [topicname,255], [topicname,222], [topicname,243],
[topicname,261], [topicname,90], [topicname,114], [topicname,3],
[topicname,81], [topicname,180], [topicname,21], [topicname,6],
[topicname,195], [topicname,129], [topicname,192], [topicname,99],
[topicname,294], [topicname,165], [topicname,240], [topicname,66],
[topicname,75], [topicname,15], [topicname,273], [topicname,120]))
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/09/10 02:36:02 ERROR yarn.ApplicationMaster: User class threw exception:
ArrayBuffer(java.io.EOFException: Received -1 when reading from channel,
socket has likely been closed., org.apache.spark.SparkException: Couldn't
find leader offsets for Set([topicname,51], [topicname,201],
[topicname,54], [topicname,93], [topicname,297], [topicname,123],
[topicname,147], [topicname,126], [topicname,189], [topicname,111],
[topicname,159], [topicname,33], [topicname,36], [topicname,60],
[topicname,216], [topicname,9], [topicname,12], [topicname,282],
[topicname,39], [topicname,63], [topicname,231], [topicname,279],
[topicname,18], [topicname,30], [topicname,276], [topicname,228],
[topicname,84], [topicname,252], [topicname,48], [topicname,150],
[topicname,132], [topicname,57], [topicname,72], [topicname,291],
[topicname,234], [topicname,204], [topicname,186], [topicname,264],
[topicname,288], [topicname,87], [topicname,78], [topicname,249],
[topicname,102], [topicname,108], [topicname,237], [topicname,24],
[topicname,96], [topicname,135], [topicname,198], [topicname,162],
[topicname,42], [topicname,258], [topicname,0], [topicname,174],
[topicname,207], [topicname,210], [topicname,246], [topicname,225],
[topicname,270], [topicname,156], [topicname,183], [topicname,144],
[topicname,117], [topicname,69], [topicname,45], [topicname,219],
[topicname,177], [topicname,105], [topicname,171], [topicname,141],
[topicname,285], [topicname,27], [topicname,168], [topicname,267],
[topicname,213], [topicname,153], [topicname,138], [topicname,255],
[topicname,222], [topicname,243], [topicname,261], [topicname,90],
[topicname,114], [topicname,3], [topicname,81], [topicname,180],
[topicname,21], [topicname,6], [topicname,195], [topicname,129],
[topicname,192], [topicname,99], [topicname,294], [topicname,165],
[topicname,240], [topicname,66], [topicname,75], [topicname,15],
[topicname,273], [topicname,120]))


On Thu, Sep 10, 2015 at 8:18 PM, Cody Koeninger <c...@koeninger.org> wrote:

> NotLeaderForPartitionException means you lost a kafka broker or had a
> rebalance... why did you say " I am getting Connection tmeout in my code."
>
> You've asked questions about this exact same situation before, the answer
> remains the same
>
> On Thu, Sep 10, 2015 at 9:44 AM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> Stack trace is
>> 15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname
>> partition 99,  sleeping for 200ms
>> kafka.common.NotLeaderForPartitionException
>>         at sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown
>> Source)
>>         at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>         at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>         at java.lang.Class.newInstance(Class.java:374)
>>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
>>         at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142)
>>         at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151)
>>         at
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>         at
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>         at
>> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
>>         at
>> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147)
>>         at
>> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35)
>>         at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
>>         at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
>>         at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>>         at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> 15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while
>> consuming messages from kafka
>>
>>
>>
>>
>> Actual code is :
>>
>> In driver :
>> final KafkaStreamTransformations transformations = new
>> KafkaStreamTransformations
>> (...);
>>
>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>
>> @Override
>> public Void call(JavaRDD<byte[][]> v1) throws Exception {
>> v1.foreachPartition(transformations);
>> return null;
>> }
>> });
>> --------------------------------------------
>>
>> In KafkaStreamTransformations :
>>
>>
>> @Override
>> public void call(Iterator<byte[][]> t) throws Exception {
>> try{
>> while(t.hasNext()){
>> ...long running task
>> }
>> }catch(Exception e){
>> e.printStackTrace();
>> logger.error("Error while consuming messages from kafka");
>> }
>>
>>
>>
>>
>>
>>
>> On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Post the actual stacktrace you're getting
>>>
>>> On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Executors in spark streaming 1.3 fetch messages from kafka in batches
>>>> and what happens when executor takes longer time to complete a fetch batch
>>>>
>>>> say in
>>>>
>>>>
>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>>>
>>>> @Override
>>>> public Void call(JavaRDD<byte[][]> v1) throws Exception {
>>>> v1.foreachPartition(new  VoidFunction<Iterator<byte[][]>>{
>>>> @Override
>>>> public void call(Iterator<byte[][]> t) throws Exception {
>>>> //long running task
>>>> }});}});
>>>>
>>>> Will this long running task drops the connectio of executor with kafka
>>>> brokers-
>>>> And how to handle that. I am getting Connection tmeout in my code.
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to