You need type parameters for the call to createRDD indicating the type of
the key / value and the decoder to use for each.

See

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/BasicRDD.scala

Also, you need to check to see if offsets 0 through 100 are still actually
present in the kafka logs.

On Tue, Sep 22, 2015 at 9:38 AM, Yana Kadiyska <yana.kadiy...@gmail.com>
wrote:

> Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka
> queue into HDFS. Being very new to Kafka, not sure if I'm messing something
> up on that side...My hope is to read the messages presently in the queue
> (or at least the first 100 for now)
>
> Here is what I have:
> Kafka side:
>
>  ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress 
> --broker-list IP1:9092,IP2:9092,IP3:9092 --time -1
> ingress:0:34386
> ingress:1:34148
> ingress:2:34300
>
> ​
>
> On Spark side I'm trying this(1.4.1):
>
> bin/spark-shell --jars
> kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja
>
>
>
> val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>
> val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100))
> val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray)
> messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1] at RDD 
> at KafkaRDD.scala:45
>
> ​
>
> when I try messages.count I get:
>
> 15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener
> java.lang.NullPointerException
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157)
>       at 
> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>       at 
> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56)
>       at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75)
>       at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73)
>       at org.apache.spark.scheduler.Task.run(Task.scala:72)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>
>
>

Reply via email to