This line

at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(
KafkaRDD.scala:158)

is the attempt to close the underlying kafka simple consumer.

We can add a null pointer check, but the underlying issue of the consumer
being null probably indicates a problem earlier.  Do you see any previous
error messages?

Also, can you clarify for the successful and failed cases which topics you
are attempting this on, how many partitions there are, and whether there
are messages in the partitions?  There's an existing jira regarding empty
partitions.




On Mon, Mar 30, 2015 at 11:05 AM, Nicolas Phung <nicolas.ph...@gmail.com>
wrote:

> Hello,
>
> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2:
> Direct Approach (No Receivers)" (
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html).
> I'm using the following code snippets :
>
> // Create direct kafka stream with brokers and topics
> val messages = KafkaUtils.createDirectStream[String, Array[Byte],
> StringDecoder, DefaultDecoder](
> ssc, kafkaParams, topicsSet)
>
> // Get the stuff from Kafka and print them
> val raw = messages.map(_._2)
> val dStream: DStream[RawScala] = raw.map(
> byte => {
> // Avro Decoder
> println("Byte length: " + byte.length)
> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
> RawScala.toScala(rawDecoder.fromBytes(byte))
> }
> )
> // Useful for debug
> dStream.print()
>
> I launch my Spark Streaming and everything is fine if there's no incoming
> logs from Kafka. When I'm sending a log, I got the following error :
>
> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
> at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
> at
> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
> at
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
> at
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0
> (TID 94) in 12 ms on localhost (2/4)
> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0
> (TID 93) in 13 ms on localhost (3/4)
> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID
> 91)
> org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID
> 91, localhost): org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
> times; aborting job
> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose
> tasks have all completed, from pool
> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28
> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at
> HotFCANextGen.scala:63, took 0,041068 s
> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job
> 1427729680000 ms.1 from job set of time 1427729680000 ms
> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job
> 1427729680000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 28.0 (TID 91, localhost):
> org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The same code snippet works well if the topic is with a single partition.
> The issue happens when the topic is using multiple partitions. Am I doing
> something wrong ? Can you help me find the right way to write this with
> kafka topic with multiple partitions.
>
> Regards,
> Nicolas PHUNG
>

Reply via email to