You need type parameters for the call to createRDD indicating the type of the key / value and the decoder to use for each.
See https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/BasicRDD.scala Also, you need to check to see if offsets 0 through 100 are still actually present in the kafka logs. On Tue, Sep 22, 2015 at 9:38 AM, Yana Kadiyska <yana.kadiy...@gmail.com> wrote: > Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka > queue into HDFS. Being very new to Kafka, not sure if I'm messing something > up on that side...My hope is to read the messages presently in the queue > (or at least the first 100 for now) > > Here is what I have: > Kafka side: > > ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress > --broker-list IP1:9092,IP2:9092,IP3:9092 --time -1 > ingress:0:34386 > ingress:1:34148 > ingress:2:34300 > > > > On Spark side I'm trying this(1.4.1): > > bin/spark-shell --jars > kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja > > > > val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above > val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) > > val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100)) > val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray) > messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1] at RDD > at KafkaRDD.scala:45 > > > > when I try messages.count I get: > > 15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener > java.lang.NullPointerException > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157) > at > org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100) > at > org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73) > at org.apache.spark.scheduler.Task.run(Task.scala:72) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > >