Hello, I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2: Direct Approach (No Receivers)" ( http://spark.apache.org/docs/latest/streaming-kafka-integration.html). I'm using the following code snippets :
// Create direct kafka stream with brokers and topics val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( ssc, kafkaParams, topicsSet) // Get the stuff from Kafka and print them val raw = messages.map(_._2) val dStream: DStream[RawScala] = raw.map( byte => { // Avro Decoder println("Byte length: " + byte.length) val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema) RawScala.toScala(rawDecoder.fromBytes(byte)) } ) // Useful for debug dStream.print() I launch my Spark Streaming and everything is fine if there's no incoming logs from Kafka. When I'm sending a log, I got the following error : 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener java.lang.NullPointerException at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0 (TID 94) in 12 ms on localhost (2/4) 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0 (TID 93) in 13 ms on localhost (3/4) 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID 91) org.apache.spark.util.TaskCompletionListenerException at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID 91, localhost): org.apache.spark.util.TaskCompletionListenerException at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1 times; aborting job 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose tasks have all completed, from pool 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at HotFCANextGen.scala:63, took 0,041068 s 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job 1427729680000 ms.1 from job set of time 1427729680000 ms 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job 1427729680000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage 28.0 (TID 91, localhost): org.apache.spark.util.TaskCompletionListenerException at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) The same code snippet works well if the topic is with a single partition. The issue happens when the topic is using multiple partitions. Am I doing something wrong ? Can you help me find the right way to write this with kafka topic with multiple partitions. Regards, Nicolas PHUNG