Did you try this example?

https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala

I think you need to create a topic set with # partitions to consume.

Thanks
Best Regards

On Mon, Mar 30, 2015 at 9:35 PM, Nicolas Phung <nicolas.ph...@gmail.com>
wrote:

> Hello,
>
> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2:
> Direct Approach (No Receivers)" (
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html).
> I'm using the following code snippets :
>
> // Create direct kafka stream with brokers and topics
> val messages = KafkaUtils.createDirectStream[String, Array[Byte],
> StringDecoder, DefaultDecoder](
> ssc, kafkaParams, topicsSet)
>
> // Get the stuff from Kafka and print them
> val raw = messages.map(_._2)
> val dStream: DStream[RawScala] = raw.map(
> byte => {
> // Avro Decoder
> println("Byte length: " + byte.length)
> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
> RawScala.toScala(rawDecoder.fromBytes(byte))
> }
> )
> // Useful for debug
> dStream.print()
>
> I launch my Spark Streaming and everything is fine if there's no incoming
> logs from Kafka. When I'm sending a log, I got the following error :
>
> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
> at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
> at
> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
> at
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
> at
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0
> (TID 94) in 12 ms on localhost (2/4)
> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0
> (TID 93) in 13 ms on localhost (3/4)
> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID
> 91)
> org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID
> 91, localhost): org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
> times; aborting job
> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose
> tasks have all completed, from pool
> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28
> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at
> HotFCANextGen.scala:63, took 0,041068 s
> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job
> 1427729680000 ms.1 from job set of time 1427729680000 ms
> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job
> 1427729680000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 28.0 (TID 91, localhost):
> org.apache.spark.util.TaskCompletionListenerException
> at
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
> at org.apache.spark.scheduler.Task.run(Task.scala:58)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The same code snippet works well if the topic is with a single partition.
> The issue happens when the topic is using multiple partitions. Am I doing
> something wrong ? Can you help me find the right way to write this with
> kafka topic with multiple partitions.
>
> Regards,
> Nicolas PHUNG
>

Reply via email to