Thanks a lot Cody! I was punting on the decoders by calling count (or
trying to, since my types require a custom decoder) but your sample code is
exactly what I was trying to achieve. The error message threw me off, will
work on the decoders now

On Tue, Sep 22, 2015 at 10:50 AM, Cody Koeninger <c...@koeninger.org> wrote:

> You need type parameters for the call to createRDD indicating the type of
> the key / value and the decoder to use for each.
>
> See
>
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/BasicRDD.scala
>
> Also, you need to check to see if offsets 0 through 100 are still actually
> present in the kafka logs.
>
> On Tue, Sep 22, 2015 at 9:38 AM, Yana Kadiyska <yana.kadiy...@gmail.com>
> wrote:
>
>> Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka
>> queue into HDFS. Being very new to Kafka, not sure if I'm messing something
>> up on that side...My hope is to read the messages presently in the queue
>> (or at least the first 100 for now)
>>
>> Here is what I have:
>> Kafka side:
>>
>>  ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress 
>> --broker-list IP1:9092,IP2:9092,IP3:9092 --time -1
>> ingress:0:34386
>> ingress:1:34148
>> ingress:2:34300
>>
>> ​
>>
>> On Spark side I'm trying this(1.4.1):
>>
>> bin/spark-shell --jars
>> kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja
>>
>>
>>
>> val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above
>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>
>> val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100))
>> val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray)
>> messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1] at RDD 
>> at KafkaRDD.scala:45
>>
>> ​
>>
>> when I try messages.count I get:
>>
>> 15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener
>> java.lang.NullPointerException
>>      at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157)
>>      at 
>> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>>      at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>>      at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>>      at 
>> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56)
>>      at 
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75)
>>      at 
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
>>      at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>      at 
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:72)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>      at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>

Reply via email to