Re: Reading Real Time Data only from Kafka
Cool. Thanks for the detailed response Cody. Thanks Best Regards On Tue, May 19, 2015 at 6:43 PM, Cody Koeninger c...@koeninger.org wrote: If those questions aren't answered by https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md please let me know so I can update it. If you set auto.offset.reset to largest, it will start at the largest offset. Any messages before that will be skipped, so if prior runs of the job didn't consume them, they're lost. KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from a locality hint if you have kafka running on the same node as spark), and it doesn't have any long-running receivers. Executors get whatever partitions the normal scheduler decides they should get. If an executor fails, a different executor reads the offset range for the failed partition; they're immutable, so no difference in result. Deciding where to save offsets (or not) is up to you. You can checkpoint, or store them yourself. On Mon, May 18, 2015 at 12:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I have played a bit with the directStream kafka api. Good work cody. These are my findings and also can you clarify a few things for me (see below). - When auto.offset.reset- smallest and you have 60GB of messages in Kafka, it takes forever as it reads the whole 60GB at once. largest will only read the latest messages. - To avoid this, you can actually limit the rate with spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always reads the same amount of data). - Number of partitions per batch = number of kafka partitions. - In the case of driver failures, offset reset being set to smallest will replay the whole messages and largest will only read those messages which are pushed after the streaming job has started. What happens to those messages which arrive in between? *Few things which are unclear:* - If we have a kafka topic with 9 partitions, and spark cluster with 3 slaves, how does it decides which slave should read from which partition? And what happens if a single slave fails while reading the data? - By default it doesn't push the offsets of messages which are read anywhere, then how does it replay the message in case of failures? Thanks Best Regards On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote: You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single job at a time in the order they were defined, unless you turn on an experimental option for more parallelism (TD or someone more knowledgeable can chime in on this). If you're talking about the possibility of the next job starting before the prior one has fully finished, because your processing is lagging behind... I'm not 100% sure this is possible because I've never observed it. The thing is, it's a moot point, because if you're saving offsets yourself transactionally, you already need to be verifying that offsets are correct (increasing without gaps) in order to handle restarts correctly. If you're super concerned about how batches get generated, the direct api gives you access to KafkaUtils.createRDD... just schedule your own rdds in the order you want. Again, flexible. On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a
Re: Reading Real Time Data only from Kafka
If those questions aren't answered by https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md please let me know so I can update it. If you set auto.offset.reset to largest, it will start at the largest offset. Any messages before that will be skipped, so if prior runs of the job didn't consume them, they're lost. KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from a locality hint if you have kafka running on the same node as spark), and it doesn't have any long-running receivers. Executors get whatever partitions the normal scheduler decides they should get. If an executor fails, a different executor reads the offset range for the failed partition; they're immutable, so no difference in result. Deciding where to save offsets (or not) is up to you. You can checkpoint, or store them yourself. On Mon, May 18, 2015 at 12:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: I have played a bit with the directStream kafka api. Good work cody. These are my findings and also can you clarify a few things for me (see below). - When auto.offset.reset- smallest and you have 60GB of messages in Kafka, it takes forever as it reads the whole 60GB at once. largest will only read the latest messages. - To avoid this, you can actually limit the rate with spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always reads the same amount of data). - Number of partitions per batch = number of kafka partitions. - In the case of driver failures, offset reset being set to smallest will replay the whole messages and largest will only read those messages which are pushed after the streaming job has started. What happens to those messages which arrive in between? *Few things which are unclear:* - If we have a kafka topic with 9 partitions, and spark cluster with 3 slaves, how does it decides which slave should read from which partition? And what happens if a single slave fails while reading the data? - By default it doesn't push the offsets of messages which are read anywhere, then how does it replay the message in case of failures? Thanks Best Regards On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote: You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single job at a time in the order they were defined, unless you turn on an experimental option for more parallelism (TD or someone more knowledgeable can chime in on this). If you're talking about the possibility of the next job starting before the prior one has fully finished, because your processing is lagging behind... I'm not 100% sure this is possible because I've never observed it. The thing is, it's a moot point, because if you're saving offsets yourself transactionally, you already need to be verifying that offsets are correct (increasing without gaps) in order to handle restarts correctly. If you're super concerned about how batches get generated, the direct api gives you access to KafkaUtils.createRDD... just schedule your own rdds in the order you want. Again, flexible. On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing
Re: Reading Real Time Data only from Kafka
I have played a bit with the directStream kafka api. Good work cody. These are my findings and also can you clarify a few things for me (see below). - When auto.offset.reset- smallest and you have 60GB of messages in Kafka, it takes forever as it reads the whole 60GB at once. largest will only read the latest messages. - To avoid this, you can actually limit the rate with spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always reads the same amount of data). - Number of partitions per batch = number of kafka partitions. - In the case of driver failures, offset reset being set to smallest will replay the whole messages and largest will only read those messages which are pushed after the streaming job has started. What happens to those messages which arrive in between? *Few things which are unclear:* - If we have a kafka topic with 9 partitions, and spark cluster with 3 slaves, how does it decides which slave should read from which partition? And what happens if a single slave fails while reading the data? - By default it doesn't push the offsets of messages which are read anywhere, then how does it replay the message in case of failures? Thanks Best Regards On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org wrote: You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single job at a time in the order they were defined, unless you turn on an experimental option for more parallelism (TD or someone more knowledgeable can chime in on this). If you're talking about the possibility of the next job starting before the prior one has fully finished, because your processing is lagging behind... I'm not 100% sure this is possible because I've never observed it. The thing is, it's a moot point, because if you're saving offsets yourself transactionally, you already need to be verifying that offsets are correct (increasing without gaps) in order to handle restarts correctly. If you're super concerned about how batches get generated, the direct api gives you access to KafkaUtils.createRDD... just schedule your own rdds in the order you want. Again, flexible. On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other
Re: Reading Real Time Data only from Kafka
As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this
Re: Reading Real Time Data only from Kafka
Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark
Re: Reading Real Time Data only from Kafka
You linked to a google mail tab, not a public archive, so I don't know exactly which conversation you're referring to. As far as I know, streaming only runs a single job at a time in the order they were defined, unless you turn on an experimental option for more parallelism (TD or someone more knowledgeable can chime in on this). If you're talking about the possibility of the next job starting before the prior one has fully finished, because your processing is lagging behind... I'm not 100% sure this is possible because I've never observed it. The thing is, it's a moot point, because if you're saving offsets yourself transactionally, you already need to be verifying that offsets are correct (increasing without gaps) in order to handle restarts correctly. If you're super concerned about how batches get generated, the direct api gives you access to KafkaUtils.createRDD... just schedule your own rdds in the order you want. Again, flexible. On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Cody for your email. I think my concern was not to get the ordering of message within a partition , which as you said is possible if one knows how Spark works. The issue is how Spark schedule jobs on every batch which is not on the same order they generated. So if that is not guaranteed it does not matter if you manege order within your partition. So depends on par-partition ordering to commit offset may leads to offsets commit in wrong order. In this thread you have discussed this as well and some workaround : https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 So again , one need to understand every details of a Consumer to take a decision if that solves their use case. Regards, Dibyendu On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org wrote: As far as I can tell, Dibyendu's cons boil down to: 1. Spark checkpoints can't be recovered if you upgrade code 2. Some Spark transformations involve a shuffle, which can repartition data It's not accurate to imply that either one of those things are inherently cons of the direct stream api. Regarding checkpoints, nothing about the direct stream requires you to use checkpoints. You can save offsets in a checkpoint, your own database, or not save offsets at all (as James wants). One might even say that the direct stream api is . . . flexible . . . in that regard. Regarding partitions, the direct stream api gives you the same ordering guarantee as Kafka, namely that within a given partition messages will be in increasing offset order. Clearly if you do a transformation that repartitions the stream, that no longer holds. Thing is, that doesn't matter if you're saving offsets and results for each rdd in the driver. The offset ranges for the original rdd don't change as a result of the transformation you executed, they're immutable. Sure, you can get into trouble if you're trying to save offsets / results per partition on the executors, after a shuffle of some kind. You can avoid this pretty easily by just using normal scala code to do your transformation on the iterator inside a foreachPartition. Again, this isn't a con of the direct stream api, this is just a need to understand how Spark works. On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you
Reading Real Time Data only from Kafka
What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Many thanks both, appreciate the help. On Tue, May 12, 2015 at 4:18 PM, Cody Koeninger c...@koeninger.org wrote: Yes, that's what happens by default. If you want to be super accurate about it, you can also specify the exact starting offsets for every topic/partition. On Tue, May 12, 2015 at 9:01 AM, James King jakwebin...@gmail.com wrote: Thanks Cody. Here are the events: - Spark app connects to Kafka first time and starts consuming - Messages 1 - 10 arrive at Kafka then Spark app gets them - Now driver dies - Messages 11 - 15 arrive at Kafka - Spark driver program reconnects - Then Messages 16 - 20 arrive Kafka What I want is that Spark ignores 11 - 15 but should process 16 - 20 since they arrived after the driver reconnected to Kafka Is this what happens by default in your suggestion? On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Yes, that's what happens by default. If you want to be super accurate about it, you can also specify the exact starting offsets for every topic/partition. On Tue, May 12, 2015 at 9:01 AM, James King jakwebin...@gmail.com wrote: Thanks Cody. Here are the events: - Spark app connects to Kafka first time and starts consuming - Messages 1 - 10 arrive at Kafka then Spark app gets them - Now driver dies - Messages 11 - 15 arrive at Kafka - Spark driver program reconnects - Then Messages 16 - 20 arrive Kafka What I want is that Spark ignores 11 - 15 but should process 16 - 20 since they arrived after the driver reconnected to Kafka Is this what happens by default in your suggestion? On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
The low level consumer which Akhil mentioned , has been running in Pearson for last 4-5 months without any downtime. I think this one is the reliable Receiver Based Kafka consumer as of today for Spark .. if you say it that way .. Prior to Spark 1.3 other Receiver based consumers have used Kafka High level APIs which has serious issue with re-balancing and lesser fault tolerant aspect and data loss . Cody's implementation is definitely a good approach using direct stream , but both direct stream based approach and receiver based low level consumer approach has pros and cons. Like Receiver based approach need to use WAL for recovery from Driver failure which is a overhead for Kafka like system . For direct stream the offsets stored as check-pointed directory got lost if driver code is modified ..you can manage offset from your driver but for derived stream generated from this direct stream , there is no guarantee that batches are processed is order ( and offsets commits in order ) .. etc .. So whoever use whichever consumer need to study pros and cons of both approach before taking a call .. Regards, Dibyendu On Tue, May 12, 2015 at 8:10 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, I was just saying that i found more success and high throughput with the low level kafka api prior to KafkfaRDDs which is the future it seems. My apologies if you felt it that way. :) On 12 May 2015 19:47, Cody Koeninger c...@koeninger.org wrote: Akhil, I hope I'm misreading the tone of this. If you have personal issues at stake, please take them up outside of the public list. If you have actual factual concerns about the kafka integration, please share them in a jira. Regarding reliability, here's a screenshot of a current production job with a 3 week uptime Was a month before that, only took it down to change code. http://tinypic.com/r/2e4vkht/8 Regarding flexibility, both of the apis available in spark will do what James needs, as I described. On Tue, May 12, 2015 at 8:55 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Hi Cody, If you are so sure, can you share a bench-marking (which you ran for days maybe?) that you have done with Kafka APIs provided by Spark? Thanks Best Regards On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk
Re: Reading Real Time Data only from Kafka
Thanks Cody. Here are the events: - Spark app connects to Kafka first time and starts consuming - Messages 1 - 10 arrive at Kafka then Spark app gets them - Now driver dies - Messages 11 - 15 arrive at Kafka - Spark driver program reconnects - Then Messages 16 - 20 arrive Kafka What I want is that Spark ignores 11 - 15 but should process 16 - 20 since they arrived after the driver reconnected to Kafka Is this what happens by default in your suggestion? On Tue, May 12, 2015 at 3:52 PM, Cody Koeninger c...@koeninger.org wrote: I don't think it's accurate for Akhil to claim that the linked library is much more flexible/reliable than what's available in Spark at this point. James, what you're describing is the default behavior for the createDirectStream api available as part of spark since 1.3. The kafka parameter auto.offset.reset defaults to largest, ie start at the most recent available message. This is described at http://spark.apache.org/docs/latest/streaming-kafka-integration.html The createDirectStream api implementation is described in detail at https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md If for some reason you're stuck using an earlier version of spark, you can accomplish what you want simply by starting the job using a new consumer group (there will be no prior state in zookeeper, so it will start consuming according to auto.offset.reset) On Tue, May 12, 2015 at 7:26 AM, James King jakwebin...@gmail.com wrote: Very nice! will try and let you know, thanks. On Tue, May 12, 2015 at 2:25 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yep, you can try this lowlevel Kafka receiver https://github.com/dibbhatt/kafka-spark-consumer. Its much more flexible/reliable than the one comes with Spark. Thanks Best Regards On Tue, May 12, 2015 at 5:15 PM, James King jakwebin...@gmail.com wrote: What I want is if the driver dies for some reason and it is restarted I want to read only messages that arrived into Kafka following the restart of the driver program and re-connection to Kafka. Has anyone done this? any links or resources that can help explain this? Regards jk