Re: Reading Real Time Data only from Kafka

2015-05-19 Thread Akhil Das
Cool. Thanks for the detailed response Cody.

Thanks
Best Regards

On Tue, May 19, 2015 at 6:43 PM, Cody Koeninger c...@koeninger.org wrote:

 If those questions aren't answered by

 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 please let me know so I can update it.

 If you set auto.offset.reset to largest, it will start at the largest
 offset.  Any messages before that will be skipped, so if prior runs of the
 job didn't consume them, they're lost.

 KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from
 a locality hint if you have kafka running on the same node as spark), and
 it doesn't have any long-running receivers.  Executors get whatever
 partitions the normal scheduler decides they should get.  If an executor
 fails, a different executor reads the offset range for the failed
 partition; they're immutable, so no difference in result.

 Deciding where to save offsets (or not) is up to you.  You can checkpoint,
 or store them yourself.

 On Mon, May 18, 2015 at 12:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I have played a bit with the directStream kafka api. Good work cody.
 These are my findings and also can you clarify a few things for me (see
 below).

 - When auto.offset.reset- smallest and you have 60GB of messages in
 Kafka, it takes forever as it reads the whole 60GB at once. largest will
 only read the latest messages.
 - To avoid this, you can actually limit the rate with
 spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
 reads the same amount of data).
 - Number of partitions per batch = number of kafka partitions.

 - In the case of driver failures, offset reset being set to smallest
 will replay the whole messages and largest will only read those messages
 which are pushed after the streaming job has started. What happens to those
 messages which arrive in between?

 *Few things which are unclear:*

 - If we have a kafka topic with 9 partitions, and spark cluster with 3
 slaves, how does it decides which slave should read from which partition?
 And what happens if a single slave fails while reading the data?

 - By default it doesn't push the offsets of messages which are read
 anywhere, then how does it replay the message in case of failures?

 Thanks
 Best Regards

 On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You linked to a google mail tab, not a public archive, so I don't know
 exactly which conversation you're referring to.

 As far as I know, streaming only runs a single job at a time in the
 order they were defined, unless you turn on an experimental option for more
 parallelism (TD or someone more knowledgeable can chime in on this).  If
 you're talking about the possibility of the next job starting before the
 prior one has fully finished, because your processing is lagging behind...
 I'm not 100% sure this is possible because I've never observed it.

 The thing is, it's a moot point, because if you're saving offsets
 yourself transactionally, you already need to be verifying that offsets are
 correct (increasing without gaps) in order to handle restarts correctly.

 If you're super concerned about how batches get generated, the direct
 api gives you access to KafkaUtils.createRDD... just schedule your own rdds
 in the order you want.  Again, flexible.




 On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thanks Cody for your email. I think my concern was not to get the
 ordering of message within a partition , which as you said is possible if
 one knows how Spark works. The issue is how Spark schedule jobs on every
 batch  which is not on the same order they generated. So if that is not
 guaranteed it does not matter if you manege order within your partition. So
 depends on par-partition ordering to commit offset may leads to offsets
 commit in wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are
 inherently cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same
 ordering guarantee as Kafka, namely that within a 

Re: Reading Real Time Data only from Kafka

2015-05-19 Thread Cody Koeninger
If those questions aren't answered by

https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

please let me know so I can update it.

If you set auto.offset.reset to largest, it will start at the largest
offset.  Any messages before that will be skipped, so if prior runs of the
job didn't consume them, they're lost.

KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from a
locality hint if you have kafka running on the same node as spark), and it
doesn't have any long-running receivers.  Executors get whatever partitions
the normal scheduler decides they should get.  If an executor fails, a
different executor reads the offset range for the failed partition; they're
immutable, so no difference in result.

Deciding where to save offsets (or not) is up to you.  You can checkpoint,
or store them yourself.

On Mon, May 18, 2015 at 12:00 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I have played a bit with the directStream kafka api. Good work cody. These
 are my findings and also can you clarify a few things for me (see below).

 - When auto.offset.reset- smallest and you have 60GB of messages in
 Kafka, it takes forever as it reads the whole 60GB at once. largest will
 only read the latest messages.
 - To avoid this, you can actually limit the rate with
 spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
 reads the same amount of data).
 - Number of partitions per batch = number of kafka partitions.

 - In the case of driver failures, offset reset being set to smallest
 will replay the whole messages and largest will only read those messages
 which are pushed after the streaming job has started. What happens to those
 messages which arrive in between?

 *Few things which are unclear:*

 - If we have a kafka topic with 9 partitions, and spark cluster with 3
 slaves, how does it decides which slave should read from which partition?
 And what happens if a single slave fails while reading the data?

 - By default it doesn't push the offsets of messages which are read
 anywhere, then how does it replay the message in case of failures?

 Thanks
 Best Regards

 On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You linked to a google mail tab, not a public archive, so I don't know
 exactly which conversation you're referring to.

 As far as I know, streaming only runs a single job at a time in the order
 they were defined, unless you turn on an experimental option for more
 parallelism (TD or someone more knowledgeable can chime in on this).  If
 you're talking about the possibility of the next job starting before the
 prior one has fully finished, because your processing is lagging behind...
 I'm not 100% sure this is possible because I've never observed it.

 The thing is, it's a moot point, because if you're saving offsets
 yourself transactionally, you already need to be verifying that offsets are
 correct (increasing without gaps) in order to handle restarts correctly.

 If you're super concerned about how batches get generated, the direct api
 gives you access to KafkaUtils.createRDD... just schedule your own rdds in
 the order you want.  Again, flexible.




 On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thanks Cody for your email. I think my concern was not to get the
 ordering of message within a partition , which as you said is possible if
 one knows how Spark works. The issue is how Spark schedule jobs on every
 batch  which is not on the same order they generated. So if that is not
 guaranteed it does not matter if you manege order within your partition. So
 depends on par-partition ordering to commit offset may leads to offsets
 commit in wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are
 inherently cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing 

Re: Reading Real Time Data only from Kafka

2015-05-18 Thread Akhil Das
I have played a bit with the directStream kafka api. Good work cody. These
are my findings and also can you clarify a few things for me (see below).

- When auto.offset.reset- smallest and you have 60GB of messages in
Kafka, it takes forever as it reads the whole 60GB at once. largest will
only read the latest messages.
- To avoid this, you can actually limit the rate with
spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
reads the same amount of data).
- Number of partitions per batch = number of kafka partitions.

- In the case of driver failures, offset reset being set to smallest
will replay the whole messages and largest will only read those messages
which are pushed after the streaming job has started. What happens to those
messages which arrive in between?

*Few things which are unclear:*

- If we have a kafka topic with 9 partitions, and spark cluster with 3
slaves, how does it decides which slave should read from which partition?
And what happens if a single slave fails while reading the data?

- By default it doesn't push the offsets of messages which are read
anywhere, then how does it replay the message in case of failures?

Thanks
Best Regards

On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote:

 You linked to a google mail tab, not a public archive, so I don't know
 exactly which conversation you're referring to.

 As far as I know, streaming only runs a single job at a time in the order
 they were defined, unless you turn on an experimental option for more
 parallelism (TD or someone more knowledgeable can chime in on this).  If
 you're talking about the possibility of the next job starting before the
 prior one has fully finished, because your processing is lagging behind...
 I'm not 100% sure this is possible because I've never observed it.

 The thing is, it's a moot point, because if you're saving offsets yourself
 transactionally, you already need to be verifying that offsets are correct
 (increasing without gaps) in order to handle restarts correctly.

 If you're super concerned about how batches get generated, the direct api
 gives you access to KafkaUtils.createRDD... just schedule your own rdds in
 the order you want.  Again, flexible.




 On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thanks Cody for your email. I think my concern was not to get the
 ordering of message within a partition , which as you said is possible if
 one knows how Spark works. The issue is how Spark schedule jobs on every
 batch  which is not on the same order they generated. So if that is not
 guaranteed it does not matter if you manege order within your partition. So
 depends on par-partition ordering to commit offset may leads to offsets
 commit in wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are
 inherently cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing is, that doesn't
 matter if you're saving offsets and results for each rdd in the driver.
 The offset ranges for the original rdd don't change as a result of the
 transformation you executed, they're immutable.

 Sure, you can get into trouble if you're trying to save offsets /
 results per partition on the executors, after a shuffle of some kind. You
 can avoid this pretty easily by just using normal scala code to do your
 transformation on the iterator inside a foreachPartition.  Again, this
 isn't a con of the direct stream api, this is just a need to understand
 how Spark works.



 On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in
 Pearson for last 4-5 months without any downtime. I think this one is the
 reliable Receiver Based Kafka consumer as of today for Spark .. if you
 say it that way ..

 Prior to Spark 1.3 other 

Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Cody Koeninger
As far as I can tell, Dibyendu's cons boil down to:

1. Spark checkpoints can't be recovered if you upgrade code
2. Some Spark transformations involve a shuffle, which can repartition data

It's not accurate to imply that either one of those things are inherently
cons of the direct stream api.

Regarding checkpoints, nothing about the direct stream requires you to use
checkpoints.  You can save offsets in a checkpoint, your own database, or
not save offsets at all (as James wants).  One might even say that the
direct stream api is . . . flexible . . . in that regard.

Regarding partitions, the direct stream api gives you the same ordering
guarantee as Kafka, namely that within a given partition messages will be
in increasing offset order.   Clearly if you do a transformation that
repartitions the stream, that no longer holds.  Thing is, that doesn't
matter if you're saving offsets and results for each rdd in the driver.
The offset ranges for the original rdd don't change as a result of the
transformation you executed, they're immutable.

Sure, you can get into trouble if you're trying to save offsets / results
per partition on the executors, after a shuffle of some kind. You can avoid
this pretty easily by just using normal scala code to do your
transformation on the iterator inside a foreachPartition.  Again, this
isn't a con of the direct stream api, this is just a need to understand
how Spark works.



On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in Pearson
 for last 4-5 months without any downtime. I think this one is the reliable
 Receiver Based Kafka consumer as of today for Spark .. if you say it that
 way ..

 Prior to Spark 1.3 other Receiver based consumers have used Kafka High
 level APIs which has serious issue with re-balancing and lesser fault
 tolerant aspect and data loss .

 Cody's implementation is definitely a good approach using direct stream ,
 but both direct stream based approach and receiver based low level consumer
 approach has pros and cons. Like Receiver based approach need to use WAL
 for recovery from Driver failure which is a overhead for Kafka like system
 . For direct stream the offsets stored as check-pointed directory got lost
 if driver code is modified ..you can manage offset from your driver but for
 derived stream generated from this direct stream , there is no guarantee
 that batches are processed is order ( and offsets commits in order ) .. etc
 ..

 So whoever use whichever consumer need to study pros and cons of both
 approach before taking a call ..

 Regards,
 Dibyendu







 On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with the
 low level kafka api prior to KafkfaRDDs which is the future it seems. My
 apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you have personal
 issues at stake, please take them up outside of the public list.  If you
 have actual factual concerns about the kafka integration, please share them
 in a jira.

 Regarding reliability, here's a screenshot of a current production job
 with a 3 week uptime  Was a month before that, only took it down to change
 code.

 http://tinypic.com/r/2e4vkht/8

 Regarding flexibility, both of the apis available in spark will do what
 James needs, as I described.



 On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for
 days maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked library
 is much more flexible/reliable than what's available in Spark at this
 point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will 
 start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com
 wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Yep, you can try this 

Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Dibyendu Bhattacharya
Thanks Cody for your email. I think my concern was not to get the ordering
of message within a partition , which as you said is possible if one knows
how Spark works. The issue is how Spark schedule jobs on every batch  which
is not on the same order they generated. So if that is not guaranteed it
does not matter if you manege order within your partition. So depends on
par-partition ordering to commit offset may leads to offsets commit in
wrong order.

In this thread you have discussed this as well and some workaround  :

https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

So again , one need to understand every details of a Consumer to take a
decision if that solves their use case.

Regards,
Dibyendu

On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition data

 It's not accurate to imply that either one of those things are inherently
 cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to use
 checkpoints.  You can save offsets in a checkpoint, your own database, or
 not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing is, that doesn't
 matter if you're saving offsets and results for each rdd in the driver.
 The offset ranges for the original rdd don't change as a result of the
 transformation you executed, they're immutable.

 Sure, you can get into trouble if you're trying to save offsets / results
 per partition on the executors, after a shuffle of some kind. You can avoid
 this pretty easily by just using normal scala code to do your
 transformation on the iterator inside a foreachPartition.  Again, this
 isn't a con of the direct stream api, this is just a need to understand
 how Spark works.



 On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in
 Pearson for last 4-5 months without any downtime. I think this one is the
 reliable Receiver Based Kafka consumer as of today for Spark .. if you
 say it that way ..

 Prior to Spark 1.3 other Receiver based consumers have used Kafka High
 level APIs which has serious issue with re-balancing and lesser fault
 tolerant aspect and data loss .

 Cody's implementation is definitely a good approach using direct stream ,
 but both direct stream based approach and receiver based low level consumer
 approach has pros and cons. Like Receiver based approach need to use WAL
 for recovery from Driver failure which is a overhead for Kafka like system
 . For direct stream the offsets stored as check-pointed directory got lost
 if driver code is modified ..you can manage offset from your driver but for
 derived stream generated from this direct stream , there is no guarantee
 that batches are processed is order ( and offsets commits in order ) .. etc
 ..

 So whoever use whichever consumer need to study pros and cons of both
 approach before taking a call ..

 Regards,
 Dibyendu







 On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with the
 low level kafka api prior to KafkfaRDDs which is the future it seems. My
 apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you have personal
 issues at stake, please take them up outside of the public list.  If you
 have actual factual concerns about the kafka integration, please share them
 in a jira.

 Regarding reliability, here's a screenshot of a current production job
 with a 3 week uptime  Was a month before that, only took it down to change
 code.

 http://tinypic.com/r/2e4vkht/8

 Regarding flexibility, both of the apis available in spark will do what
 James needs, as I described.



 On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for
 days maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked
 library is much more flexible/reliable than what's available in Spark 
 at
 this point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark 

Re: Reading Real Time Data only from Kafka

2015-05-13 Thread Cody Koeninger
You linked to a google mail tab, not a public archive, so I don't know
exactly which conversation you're referring to.

As far as I know, streaming only runs a single job at a time in the order
they were defined, unless you turn on an experimental option for more
parallelism (TD or someone more knowledgeable can chime in on this).  If
you're talking about the possibility of the next job starting before the
prior one has fully finished, because your processing is lagging behind...
I'm not 100% sure this is possible because I've never observed it.

The thing is, it's a moot point, because if you're saving offsets yourself
transactionally, you already need to be verifying that offsets are correct
(increasing without gaps) in order to handle restarts correctly.

If you're super concerned about how batches get generated, the direct api
gives you access to KafkaUtils.createRDD... just schedule your own rdds in
the order you want.  Again, flexible.




On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Thanks Cody for your email. I think my concern was not to get the ordering
 of message within a partition , which as you said is possible if one knows
 how Spark works. The issue is how Spark schedule jobs on every batch  which
 is not on the same order they generated. So if that is not guaranteed it
 does not matter if you manege order within your partition. So depends on
 par-partition ordering to commit offset may leads to offsets commit in
 wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are inherently
 cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing is, that doesn't
 matter if you're saving offsets and results for each rdd in the driver.
 The offset ranges for the original rdd don't change as a result of the
 transformation you executed, they're immutable.

 Sure, you can get into trouble if you're trying to save offsets / results
 per partition on the executors, after a shuffle of some kind. You can avoid
 this pretty easily by just using normal scala code to do your
 transformation on the iterator inside a foreachPartition.  Again, this
 isn't a con of the direct stream api, this is just a need to understand
 how Spark works.



 On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 The low level consumer which Akhil mentioned , has been running in
 Pearson for last 4-5 months without any downtime. I think this one is the
 reliable Receiver Based Kafka consumer as of today for Spark .. if you
 say it that way ..

 Prior to Spark 1.3 other Receiver based consumers have used Kafka High
 level APIs which has serious issue with re-balancing and lesser fault
 tolerant aspect and data loss .

 Cody's implementation is definitely a good approach using direct stream
 , but both direct stream based approach and receiver based low level
 consumer approach has pros and cons. Like Receiver based approach need to
 use WAL for recovery from Driver failure which is a overhead for Kafka like
 system . For direct stream the offsets stored as check-pointed directory
 got lost if driver code is modified ..you can manage offset from your
 driver but for derived stream generated from this direct stream , there is
 no guarantee that batches are processed is order ( and offsets commits in
 order ) .. etc ..

 So whoever use whichever consumer need to study pros and cons of both
 approach before taking a call ..

 Regards,
 Dibyendu







 On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with
 the low level kafka api prior to KafkfaRDDs which is the future it seems.
 My apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you 

Reading Real Time Data only from Kafka

2015-05-12 Thread James King
What I want is if the driver dies for some reason and it is restarted I
want to read only messages that arrived into Kafka following the restart of
the driver program and re-connection to Kafka.

Has anyone done this? any links or resources that can help explain this?

Regards
jk


Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Akhil Das
Yep, you can try this lowlevel Kafka receiver
https://github.com/dibbhatt/kafka-spark-consumer. Its much more
flexible/reliable than the one comes with Spark.

Thanks
Best Regards

On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote:

 What I want is if the driver dies for some reason and it is restarted I
 want to read only messages that arrived into Kafka following the restart of
 the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain this?

 Regards
 jk





Re: Reading Real Time Data only from Kafka

2015-05-12 Thread James King
Very nice! will try and let you know, thanks.

On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote:

 What I want is if the driver dies for some reason and it is restarted I
 want to read only messages that arrived into Kafka following the restart of
 the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain this?

 Regards
 jk






Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Cody Koeninger
Akhil, I hope I'm misreading the tone of this. If you have personal issues
at stake, please take them up outside of the public list.  If you have
actual factual concerns about the kafka integration, please share them in a
jira.

Regarding reliability, here's a screenshot of a current production job with
a 3 week uptime  Was a month before that, only took it down to change code.

http://tinypic.com/r/2e4vkht/8

Regarding flexibility, both of the apis available in spark will do what
James needs, as I described.



On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for days
 maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked library is
 much more flexible/reliable than what's available in Spark at this point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com
 wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is restarted
 I want to read only messages that arrived into Kafka following the restart
 of the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain
 this?

 Regards
 jk









Re: Reading Real Time Data only from Kafka

2015-05-12 Thread James King
Many thanks both, appreciate the help.

On Tue, May 12, 2015 at 4:18 PM, Cody Koeninger c...@koeninger.org wrote:

 Yes, that's what happens by default.

 If you want to be super accurate about it, you can also specify the exact
 starting offsets for every topic/partition.

 On Tue, May 12, 2015 at 9:01 AM, James King jakwebin...@gmail.com wrote:

 Thanks Cody.

 Here are the events:

 - Spark app connects to Kafka first time and starts consuming
 - Messages 1 - 10 arrive at Kafka then Spark app gets them
 - Now driver dies
 - Messages 11 - 15 arrive at Kafka
 - Spark driver program reconnects
 - Then Messages 16 - 20 arrive Kafka

 What I want is that Spark ignores 11 - 15
 but should process 16 - 20 since they arrived after the driver
 reconnected to Kafka

 Is this what happens by default in your suggestion?





 On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked library
 is much more flexible/reliable than what's available in Spark at this
 point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com
 wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is restarted
 I want to read only messages that arrived into Kafka following the 
 restart
 of the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain
 this?

 Regards
 jk










Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Akhil Das
Hi Cody,
I was just saying that i found more success and high throughput with the
low level kafka api prior to KafkfaRDDs which is the future it seems. My
apologies if you felt it that way. :)
On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you have personal issues
 at stake, please take them up outside of the public list.  If you have
 actual factual concerns about the kafka integration, please share them in a
 jira.

 Regarding reliability, here's a screenshot of a current production job
 with a 3 week uptime  Was a month before that, only took it down to change
 code.

 http://tinypic.com/r/2e4vkht/8

 Regarding flexibility, both of the apis available in spark will do what
 James needs, as I described.



 On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for days
 maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked library
 is much more flexible/reliable than what's available in Spark at this
 point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com
 wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is restarted
 I want to read only messages that arrived into Kafka following the 
 restart
 of the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain
 this?

 Regards
 jk










Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Cody Koeninger
Yes, that's what happens by default.

If you want to be super accurate about it, you can also specify the exact
starting offsets for every topic/partition.

On Tue, May 12, 2015 at 9:01 AM, James King jakwebin...@gmail.com wrote:

 Thanks Cody.

 Here are the events:

 - Spark app connects to Kafka first time and starts consuming
 - Messages 1 - 10 arrive at Kafka then Spark app gets them
 - Now driver dies
 - Messages 11 - 15 arrive at Kafka
 - Spark driver program reconnects
 - Then Messages 16 - 20 arrive Kafka

 What I want is that Spark ignores 11 - 15
 but should process 16 - 20 since they arrived after the driver reconnected
 to Kafka

 Is this what happens by default in your suggestion?





 On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked library is
 much more flexible/reliable than what's available in Spark at this point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com
 wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is restarted
 I want to read only messages that arrived into Kafka following the restart
 of the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain
 this?

 Regards
 jk









Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Dibyendu Bhattacharya
The low level consumer which Akhil mentioned , has been running in Pearson
for last 4-5 months without any downtime. I think this one is the reliable
Receiver Based Kafka consumer as of today for Spark .. if you say it that
way ..

Prior to Spark 1.3 other Receiver based consumers have used Kafka High
level APIs which has serious issue with re-balancing and lesser fault
tolerant aspect and data loss .

Cody's implementation is definitely a good approach using direct stream ,
but both direct stream based approach and receiver based low level consumer
approach has pros and cons. Like Receiver based approach need to use WAL
for recovery from Driver failure which is a overhead for Kafka like system
. For direct stream the offsets stored as check-pointed directory got lost
if driver code is modified ..you can manage offset from your driver but for
derived stream generated from this direct stream , there is no guarantee
that batches are processed is order ( and offsets commits in order ) .. etc
..

So whoever use whichever consumer need to study pros and cons of both
approach before taking a call ..

Regards,
Dibyendu







On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Cody,
 I was just saying that i found more success and high throughput with the
 low level kafka api prior to KafkfaRDDs which is the future it seems. My
 apologies if you felt it that way. :)
 On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote:

 Akhil, I hope I'm misreading the tone of this. If you have personal
 issues at stake, please take them up outside of the public list.  If you
 have actual factual concerns about the kafka integration, please share them
 in a jira.

 Regarding reliability, here's a screenshot of a current production job
 with a 3 week uptime  Was a month before that, only took it down to change
 code.

 http://tinypic.com/r/2e4vkht/8

 Regarding flexibility, both of the apis available in spark will do what
 James needs, as I described.



 On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi Cody,

 If you are so sure, can you share a bench-marking (which you ran for
 days maybe?) that you have done with Kafka APIs provided by Spark?

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I don't think it's accurate for Akhil to claim that the linked library
 is much more flexible/reliable than what's available in Spark at this
 point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html
  The createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you
 can accomplish what you want simply by starting the job using a new
 consumer group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com
 wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is
 restarted I want to read only messages that arrived into Kafka following
 the restart of the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain
 this?

 Regards
 jk










Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Cody Koeninger
I don't think it's accurate for Akhil to claim that the linked library is
much more flexible/reliable than what's available in Spark at this point.

James, what you're describing is the default behavior for the
createDirectStream api available as part of spark since 1.3.  The kafka
parameter auto.offset.reset defaults to largest, ie start at the most
recent available message.

This is described at
http://spark.apache.org/docs/latest/streaming-kafka-integration.html  The
createDirectStream api implementation is described in detail at
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

If for some reason you're stuck using an earlier version of spark, you can
accomplish what you want simply by starting the job using a new consumer
group (there will be no prior state in zookeeper, so it will start
consuming according to auto.offset.reset)

On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is restarted I
 want to read only messages that arrived into Kafka following the restart of
 the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain this?

 Regards
 jk







Re: Reading Real Time Data only from Kafka

2015-05-12 Thread Akhil Das
Hi Cody,

If you are so sure, can you share a bench-marking (which you ran for days
maybe?) that you have done with Kafka APIs provided by Spark?

Thanks
Best Regards

On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote:

 I don't think it's accurate for Akhil to claim that the linked library is
 much more flexible/reliable than what's available in Spark at this point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html  The
 createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you can
 accomplish what you want simply by starting the job using a new consumer
 group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is restarted I
 want to read only messages that arrived into Kafka following the restart of
 the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain this?

 Regards
 jk








Re: Reading Real Time Data only from Kafka

2015-05-12 Thread James King
Thanks Cody.

Here are the events:

- Spark app connects to Kafka first time and starts consuming
- Messages 1 - 10 arrive at Kafka then Spark app gets them
- Now driver dies
- Messages 11 - 15 arrive at Kafka
- Spark driver program reconnects
- Then Messages 16 - 20 arrive Kafka

What I want is that Spark ignores 11 - 15
but should process 16 - 20 since they arrived after the driver reconnected
to Kafka

Is this what happens by default in your suggestion?





On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger c...@koeninger.org wrote:

 I don't think it's accurate for Akhil to claim that the linked library is
 much more flexible/reliable than what's available in Spark at this point.

 James, what you're describing is the default behavior for the
 createDirectStream api available as part of spark since 1.3.  The kafka
 parameter auto.offset.reset defaults to largest, ie start at the most
 recent available message.

 This is described at
 http://spark.apache.org/docs/latest/streaming-kafka-integration.html  The
 createDirectStream api implementation is described in detail at
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 If for some reason you're stuck using an earlier version of spark, you can
 accomplish what you want simply by starting the job using a new consumer
 group (there will be no prior state in zookeeper, so it will start
 consuming according to auto.offset.reset)

 On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote:

 Very nice! will try and let you know, thanks.

 On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yep, you can try this lowlevel Kafka receiver
 https://github.com/dibbhatt/kafka-spark-consumer. Its much more
 flexible/reliable than the one comes with Spark.

 Thanks
 Best Regards

 On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com
 wrote:

 What I want is if the driver dies for some reason and it is restarted I
 want to read only messages that arrived into Kafka following the restart of
 the driver program and re-connection to Kafka.

 Has anyone done this? any links or resources that can help explain this?

 Regards
 jk