Nicolas: See if there was occurrence of the following exception in the log: errs => throw new SparkException( s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " + errs.mkString("\n")),
Cheers On Mon, Mar 30, 2015 at 9:40 AM, Cody Koeninger <c...@koeninger.org> wrote: > This line > > at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close( > KafkaRDD.scala:158) > > is the attempt to close the underlying kafka simple consumer. > > We can add a null pointer check, but the underlying issue of the consumer > being null probably indicates a problem earlier. Do you see any previous > error messages? > > Also, can you clarify for the successful and failed cases which topics you > are attempting this on, how many partitions there are, and whether there > are messages in the partitions? There's an existing jira regarding empty > partitions. > > > > > On Mon, Mar 30, 2015 at 11:05 AM, Nicolas Phung <nicolas.ph...@gmail.com> > wrote: > >> Hello, >> >> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2: >> Direct Approach (No Receivers)" ( >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html). >> I'm using the following code snippets : >> >> // Create direct kafka stream with brokers and topics >> val messages = KafkaUtils.createDirectStream[String, Array[Byte], >> StringDecoder, DefaultDecoder]( >> ssc, kafkaParams, topicsSet) >> >> // Get the stuff from Kafka and print them >> val raw = messages.map(_._2) >> val dStream: DStream[RawScala] = raw.map( >> byte => { >> // Avro Decoder >> println("Byte length: " + byte.length) >> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema) >> RawScala.toScala(rawDecoder.fromBytes(byte)) >> } >> ) >> // Useful for debug >> dStream.print() >> >> I launch my Spark Streaming and everything is fine if there's no incoming >> logs from Kafka. When I'm sending a log, I got the following error : >> >> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener >> java.lang.NullPointerException >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) >> at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) >> at >> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) >> at >> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) >> at >> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:58) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0 >> (TID 94) in 12 ms on localhost (2/4) >> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0 >> (TID 93) in 13 ms on localhost (3/4) >> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 >> (TID 91) >> org.apache.spark.util.TaskCompletionListenerException >> at >> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >> at org.apache.spark.scheduler.Task.run(Task.scala:58) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID >> 91, localhost): org.apache.spark.util.TaskCompletionListenerException >> at >> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >> at org.apache.spark.scheduler.Task.run(Task.scala:58) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1 >> times; aborting job >> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose >> tasks have all completed, from pool >> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28 >> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at >> HotFCANextGen.scala:63, took 0,041068 s >> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job >> 1427729680000 ms.1 from job set of time 1427729680000 ms >> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job >> 1427729680000 ms.0 >> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 >> in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage >> 28.0 (TID 91, localhost): >> org.apache.spark.util.TaskCompletionListenerException >> at >> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >> at org.apache.spark.scheduler.Task.run(Task.scala:58) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> The same code snippet works well if the topic is with a single partition. >> The issue happens when the topic is using multiple partitions. Am I doing >> something wrong ? Can you help me find the right way to write this with >> kafka topic with multiple partitions. >> >> Regards, >> Nicolas PHUNG >> > >