Did you try this example? https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
I think you need to create a topic set with # partitions to consume. Thanks Best Regards On Mon, Mar 30, 2015 at 9:35 PM, Nicolas Phung <nicolas.ph...@gmail.com> wrote: > Hello, > > I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2: > Direct Approach (No Receivers)" ( > http://spark.apache.org/docs/latest/streaming-kafka-integration.html). > I'm using the following code snippets : > > // Create direct kafka stream with brokers and topics > val messages = KafkaUtils.createDirectStream[String, Array[Byte], > StringDecoder, DefaultDecoder]( > ssc, kafkaParams, topicsSet) > > // Get the stuff from Kafka and print them > val raw = messages.map(_._2) > val dStream: DStream[RawScala] = raw.map( > byte => { > // Avro Decoder > println("Byte length: " + byte.length) > val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema) > RawScala.toScala(rawDecoder.fromBytes(byte)) > } > ) > // Useful for debug > dStream.print() > > I launch my Spark Streaming and everything is fine if there's no incoming > logs from Kafka. When I'm sending a log, I got the following error : > > 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener > java.lang.NullPointerException > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) > at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) > at > org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0 > (TID 94) in 12 ms on localhost (2/4) > 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0 > (TID 93) in 13 ms on localhost (3/4) > 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID > 91) > org.apache.spark.util.TaskCompletionListenerException > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID > 91, localhost): org.apache.spark.util.TaskCompletionListenerException > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1 > times; aborting job > 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose > tasks have all completed, from pool > 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28 > 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at > HotFCANextGen.scala:63, took 0,041068 s > 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job > 1427729680000 ms.1 from job set of time 1427729680000 ms > 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job > 1427729680000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage > 28.0 (TID 91, localhost): > org.apache.spark.util.TaskCompletionListenerException > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > The same code snippet works well if the topic is with a single partition. > The issue happens when the topic is using multiple partitions. Am I doing > something wrong ? Can you help me find the right way to write this with > kafka topic with multiple partitions. > > Regards, > Nicolas PHUNG >