Hello,

I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2:
Direct Approach (No Receivers)" (
http://spark.apache.org/docs/latest/streaming-kafka-integration.html). I'm
using the following code snippets :

// Create direct kafka stream with brokers and topics
val messages = KafkaUtils.createDirectStream[String, Array[Byte],
StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)

// Get the stuff from Kafka and print them
val raw = messages.map(_._2)
val dStream: DStream[RawScala] = raw.map(
byte => {
// Avro Decoder
println("Byte length: " + byte.length)
val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
RawScala.toScala(rawDecoder.fromBytes(byte))
}
)
// Useful for debug
dStream.print()

I launch my Spark Streaming and everything is fine if there's no incoming
logs from Kafka. When I'm sending a log, I got the following error :

15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at
org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
at
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
at
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0 (TID
94) in 12 ms on localhost (2/4)
15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0 (TID
93) in 13 ms on localhost (3/4)
15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID
91)
org.apache.spark.util.TaskCompletionListenerException
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID 91,
localhost): org.apache.spark.util.TaskCompletionListenerException
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
times; aborting job
15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose tasks
have all completed, from pool
15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28
15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at
HotFCANextGen.scala:63, took 0,041068 s
15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job
1427729680000 ms.1 from job set of time 1427729680000 ms
15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job
1427729680000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage
28.0 (TID 91, localhost):
org.apache.spark.util.TaskCompletionListenerException
at
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The same code snippet works well if the topic is with a single partition.
The issue happens when the topic is using multiple partitions. Am I doing
something wrong ? Can you help me find the right way to write this with
kafka topic with multiple partitions.

Regards,
Nicolas PHUNG

Reply via email to