Thanks a lot Cody! I was punting on the decoders by calling count (or trying to, since my types require a custom decoder) but your sample code is exactly what I was trying to achieve. The error message threw me off, will work on the decoders now
On Tue, Sep 22, 2015 at 10:50 AM, Cody Koeninger <c...@koeninger.org> wrote: > You need type parameters for the call to createRDD indicating the type of > the key / value and the decoder to use for each. > > See > > > https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/BasicRDD.scala > > Also, you need to check to see if offsets 0 through 100 are still actually > present in the kafka logs. > > On Tue, Sep 22, 2015 at 9:38 AM, Yana Kadiyska <yana.kadiy...@gmail.com> > wrote: > >> Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka >> queue into HDFS. Being very new to Kafka, not sure if I'm messing something >> up on that side...My hope is to read the messages presently in the queue >> (or at least the first 100 for now) >> >> Here is what I have: >> Kafka side: >> >> ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress >> --broker-list IP1:9092,IP2:9092,IP3:9092 --time -1 >> ingress:0:34386 >> ingress:1:34148 >> ingress:2:34300 >> >> >> >> On Spark side I'm trying this(1.4.1): >> >> bin/spark-shell --jars >> kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja >> >> >> >> val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above >> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) >> >> val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100)) >> val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray) >> messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1] at RDD >> at KafkaRDD.scala:45 >> >> >> >> when I try messages.count I get: >> >> 15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener >> java.lang.NullPointerException >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157) >> at >> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100) >> at >> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100) >> at >> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56) >> at >> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75) >> at >> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73) >> at org.apache.spark.scheduler.Task.run(Task.scala:72) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> >> >