Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Saisai Shao
Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
and straightforward in Python, where we need to have a specific API to
handle this, I'm not sure is there any simple workaround to fix this, maybe
we should think carefully about it.

2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com:


 Thanks, Jerry. That's what I suspected based on the code I looked at. Any
 pointers on what is needed to build in this support would be great. This is
 critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so that
 we can continue from where we left off after a code deployment. In other
 words, we need exactly-once processing guarantees across code deployments.
 Spark does not support any state persistence across deployments so this is
 something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't
 been able to figure out how to get the offsets from the RDD. Looks like 
 the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit








Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Juan Rodríguez Hortalá
Hi,

If you want I would be happy to work in this. I have worked with
KafkaUtils.createDirectStream before, in a pull request that wasn't
accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python
and I'm starting to feel comfortable with Scala, so if someone opens a JIRA
I can take it.

Greetings,

Juan Rodriguez


2015-06-12 15:59 GMT+02:00 Cody Koeninger c...@koeninger.org:

 The scala api has 2 ways of calling createDirectStream.  One of them
 allows you to pass a message handler that gets full access to the kafka
 MessageAndMetadata, including offset.

 I don't know why the python api was developed with only one way to call
 createDirectStream, but the first thing I'd look at would be adding that
 functionality back in.  If someone wants help creating a patch for that,
 just let me know.

 Dealing with offsets on a per-message basis may not be as efficient as
 dealing with them on a batch basis using the HasOffsetRanges interface...
 but if efficiency was a primary concern, you probably wouldn't be using
 Python anyway.

 On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
 and straightforward in Python, where we need to have a specific API to
 handle this, I'm not sure is there any simple workaround to fix this, maybe
 we should think carefully about it.

 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com:


 Thanks, Jerry. That's what I suspected based on the code I looked at.
 Any pointers on what is needed to build in this support would be great.
 This is critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so
 that we can continue from where we left off after a code deployment. In
 other words, we need exactly-once processing guarantees across code
 deployments. Spark does not support any state persistence across
 deployments so this is something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, 
 why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but
 haven't been able to figure out how to get the offsets from the RDD. 
 Looks
 like the documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit










Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Amit Ramesh
Hi Juan,

I have created a ticket for this:
https://issues.apache.org/jira/browse/SPARK-8337

Thanks!
Amit


On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá 
juan.rodriguez.hort...@gmail.com wrote:

 Hi,

 If you want I would be happy to work in this. I have worked with
 KafkaUtils.createDirectStream before, in a pull request that wasn't
 accepted https://github.com/apache/spark/pull/5367. I'm fluent with
 Python and I'm starting to feel comfortable with Scala, so if someone opens
 a JIRA I can take it.

 Greetings,

 Juan Rodriguez


 2015-06-12 15:59 GMT+02:00 Cody Koeninger c...@koeninger.org:

 The scala api has 2 ways of calling createDirectStream.  One of them
 allows you to pass a message handler that gets full access to the kafka
 MessageAndMetadata, including offset.

 I don't know why the python api was developed with only one way to call
 createDirectStream, but the first thing I'd look at would be adding that
 functionality back in.  If someone wants help creating a patch for that,
 just let me know.

 Dealing with offsets on a per-message basis may not be as efficient as
 dealing with them on a batch basis using the HasOffsetRanges interface...
 but if efficiency was a primary concern, you probably wouldn't be using
 Python anyway.

 On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Scala KafkaRDD uses a trait to handle this problem, but it is not so
 easy and straightforward in Python, where we need to have a specific API to
 handle this, I'm not sure is there any simple workaround to fix this, maybe
 we should think carefully about it.

 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com:


 Thanks, Jerry. That's what I suspected based on the code I looked at.
 Any pointers on what is needed to build in this support would be great.
 This is critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add 
 this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets 
 are
 no more persisted in Zookeeper but rather within Spark itself. If you 
 want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so
 that we can continue from where we left off after a code deployment. In
 other words, we need exactly-once processing guarantees across code
 deployments. Spark does not support any state persistence across
 deployments so this is something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
  wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, 
 why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but
 haven't been able to figure out how to get the offsets from the RDD. 
 Looks
 like the documentation is yet to be updated to include Python examples 
 (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit











Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-12 Thread Cody Koeninger
The scala api has 2 ways of calling createDirectStream.  One of them allows
you to pass a message handler that gets full access to the kafka
MessageAndMetadata, including offset.

I don't know why the python api was developed with only one way to call
createDirectStream, but the first thing I'd look at would be adding that
functionality back in.  If someone wants help creating a patch for that,
just let me know.

Dealing with offsets on a per-message basis may not be as efficient as
dealing with them on a batch basis using the HasOffsetRanges interface...
but if efficiency was a primary concern, you probably wouldn't be using
Python anyway.

On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Scala KafkaRDD uses a trait to handle this problem, but it is not so easy
 and straightforward in Python, where we need to have a specific API to
 handle this, I'm not sure is there any simple workaround to fix this, maybe
 we should think carefully about it.

 2015-06-12 13:59 GMT+08:00 Amit Ramesh a...@yelp.com:


 Thanks, Jerry. That's what I suspected based on the code I looked at. Any
 pointers on what is needed to build in this support would be great. This is
 critical to the project we are currently working on.

 Thanks!


 On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 OK, I get it, I think currently Python based Kafka direct API do not
 provide such equivalence like Scala, maybe we should figure out to add this
 into Python API also.

 2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so that
 we can continue from where we left off after a code deployment. In other
 words, we need exactly-once processing guarantees across code deployments.
 Spark does not support any state persistence across deployments so this is
 something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't
 been able to figure out how to get the offsets from the RDD. Looks like 
 the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit









Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-11 Thread Amit Ramesh
Hi Jerry,

Take a look at this example:
https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

The offsets are needed because as RDDs get generated within spark the
offsets move further along. With direct Kafka mode the current offsets are
no more persisted in Zookeeper but rather within Spark itself. If you want
to be able to use zookeeper based monitoring tools to keep track of
progress, then this is needed.

In my specific case we need to persist Kafka offsets externally so that we
can continue from where we left off after a code deployment. In other
words, we need exactly-once processing guarantees across code deployments.
Spark does not support any state persistence across deployments so this is
something we need to handle on our own.

Hope that helps. Let me know if not.

Thanks!
Amit


On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't
 been able to figure out how to get the offsets from the RDD. Looks like the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit





Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-11 Thread Saisai Shao
OK, I get it, I think currently Python based Kafka direct API do not
provide such equivalence like Scala, maybe we should figure out to add this
into Python API also.

2015-06-12 13:48 GMT+08:00 Amit Ramesh a...@yelp.com:


 Hi Jerry,

 Take a look at this example:
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2

 The offsets are needed because as RDDs get generated within spark the
 offsets move further along. With direct Kafka mode the current offsets are
 no more persisted in Zookeeper but rather within Spark itself. If you want
 to be able to use zookeeper based monitoring tools to keep track of
 progress, then this is needed.

 In my specific case we need to persist Kafka offsets externally so that we
 can continue from where we left off after a code deployment. In other
 words, we need exactly-once processing guarantees across code deployments.
 Spark does not support any state persistence across deployments so this is
 something we need to handle on our own.

 Hope that helps. Let me know if not.

 Thanks!
 Amit


 On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi,

 What is your meaning of getting the offsets from the RDD, from my
 understanding, the offsetRange is a parameter you offered to KafkaRDD, why
 do you still want to get the one previous you set into?

 Thanks
 Jerry

 2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't
 been able to figure out how to get the offsets from the RDD. Looks like the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
 I am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit






Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?

2015-06-11 Thread Saisai Shao
Hi,

What is your meaning of getting the offsets from the RDD, from my
understanding, the offsetRange is a parameter you offered to KafkaRDD, why
do you still want to get the one previous you set into?

Thanks
Jerry

2015-06-12 12:36 GMT+08:00 Amit Ramesh a...@yelp.com:


 Congratulations on the release of 1.4!

 I have been trying out the direct Kafka support in python but haven't been
 able to figure out how to get the offsets from the RDD. Looks like the
 documentation is yet to be updated to include Python examples (
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I
 am specifically looking for the equivalent of
 https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
 I tried digging through the python code but could not find anything
 related. Any pointers would be greatly appreciated.

 Thanks!
 Amit