[ 
https://issues.apache.org/jira/browse/SPARK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704674#comment-14704674
 ] 

Amit Ramesh edited comment on SPARK-10122 at 8/20/15 10:51 AM:
---------------------------------------------------------------

[~srowen] as you can see in the example, offsetRanges() is being applied to the 
initial RDD as part of the transform operation. And the code works fine if the 
line 'kafka_stream.transform(attach_kafka_metadata).count().pprint()' is 
changed to 'kafka_stream.transform(attach_kafka_metadata).pprint()'.


was (Author: aramesh):
[~srowen] as you can see in the example, offsetRanges() is being applied to the 
initial RDD as part of the transform operation. And the code works file if the 
line 'kafka_stream.transform(attach_kafka_metadata).count().pprint()' is 
changed to 'kafka_stream.transform(attach_kafka_metadata).pprint()'.

> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> ------------------------------------------------------------
>
>                 Key: SPARK-10122
>                 URL: https://issues.apache.org/jira/browse/SPARK-10122
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Streaming
>            Reporter: Amit Ramesh
>            Priority: Critical
>              Labels: kafka
>
> SPARK-8389 added the offsetRanges interface to Kafka direct streams. This 
> however appears to break when chaining operations after a transform 
> operation. Following is example code that would result in an error (stack 
> trace below). Note that if the 'count()' operation is taken out of the 
> example code then this error does not occur anymore, and the Kafka data is 
> printed.
> {code:title=kafka_test.py|collapse=true}
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
> def attach_kafka_metadata(kafka_rdd):
>     offset_ranges = kafka_rdd.offsetRanges()
>     return kafka_rdd
> if __name__ == "__main__":
>     sc = SparkContext(appName='kafka-test')
>     ssc = StreamingContext(sc, 10)
>     kafka_stream = KafkaUtils.createDirectStream(
>         ssc,
>         [TOPIC],
>         kafkaParams={
>             'metadata.broker.list': BROKERS,
>         },
>     )
>     kafka_stream.transform(attach_kafka_metadata).count().pprint()
>     ssc.start()
>     ssc.awaitTermination()
> {code}
> {code:title=Stack trace|collapse=true}
> Traceback (most recent call last):
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 
> line 62, in call
>     r = self.func(t, *rdds)
>   File 
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
> 616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File 
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
> 616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File 
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
> 616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File 
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 
> 616, in <lambda>
>     self.func = lambda t, rdd: func(t, prev_func(t, rdd))
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", 
> line 332, in <lambda>
>     func = lambda t, rdd: oldfunc(rdd)
>   File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in 
> attach_kafka_metadata
>     offset_ranges = kafka_rdd.offsetRanges()
> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to