Re: committing offset metadata in kafka streams
Sure. Sorry I was not clear. Thank you! lör 10 mars 2018 kl. 00:54 skrev Matthias J. Sax: > If there is only one partition by task, processing order is guaranteed. > > For default partitions grouper, it depends on your program. If you read > from multiple topics and join/merge them, a task gets multiple > partitions (from different topics) assigned. > > > -Matthias > > On 3/9/18 2:42 PM, Stas Chizhov wrote: > >> Also note, that the processing order might slightly differ if you > > process the same data twice > > > > Is this still a problem when default partition grouper is used (with 1 > > partition per task)? > > > > Thank you, > > Stanislav. > > > > > > > > 2018-03-09 3:19 GMT+01:00 Matthias J. Sax : > > > >> Thanks for the explanation. > >> > >> Not sure if setting the metadata you want to get committed in > >> punctuation() would be sufficient. But I would think about it in more > >> details if we get a KIP for this. > >> > >> It's correct that flushing and committing offsets is correlated. But > >> it's not related to punctuation. > >> > >> Also note, that the processing order might slightly differ if you > >> process the same data twice (it depends in which order the brokers > >> return data on poll() and that it something Streams cannot fully > >> control). Thus, you code would need to be "robust" against different > >> processing orders (ie, if there are multiple input partitions, you might > >> get data first for partition 0 and there for partition 1 or the other > >> way round -- the order per partitions is guaranteed to be in offset > order). > >> > >> > >> -Matthias > >> > >> > >> > >> On 3/6/18 2:17 AM, Stas Chizhov wrote: > >>> Thank you, Matthias! > >>> > >>> We currently do use kafka consumer and store event time highwatermarks > as > >>> offset metadata. This is used during backup procedure, which is to > >> create a > >>> snapshot of the target storage with all events up to certain timestamp > >> and > >>> no other. > >>> > >>> As for the API - I guess being able to provide partition-to-metadata > map > >> in > >>> the context commit method would do it (to be called from within > punctuate > >>> method). BTW as far as I understand if Processor API is used flushing > >>> producers and committing offsets is correlated and both output topic > >> state > >>> and committed offsets do correspond to a state at the moment of some > >>> punctuation. Meaning that if I do have a deterministic processing > >> topology > >>> my output topic is going to be deterministic as well (modulo duplicates > >> of > >>> course). Am I correct here? > >>> > >>> Best regards, > >>> Stanislav. > >>> > >>> > >>> 2018-03-05 2:31 GMT+01:00 Matthias J. Sax : > >>> > You are correct. This is not possible atm. > > Note, that commits happen "under the hood" and users cannot commit > explicitly. Users can only "request" as commit -- this implies that > Kafka Streams will commit as soon as possible -- but when > `context#commit()` returns, the commit is not done yet (it only sets a > flag). > > What is your use case for this? How would you want to use this from an > API point of view? > > Feel free to open a feature request JIRA -- we don't have any plans to > add this atm -- it's the first time anybody asks for this feature. If > there is a JIRA, maybe somebody picks it up :) > > > -Matthias > > On 3/3/18 6:51 AM, Stas Chizhov wrote: > > Hi, > > > > There seems to be no way to commit custom metadata along with offsets > from > > within Kafka Streams. > > Are there any plans to expose this functionality or have I missed > something? > > > > Best regards, > > Stanislav. > > > > > >>> > >> > >> > > > >
Re: committing offset metadata in kafka streams
If there is only one partition by task, processing order is guaranteed. For default partitions grouper, it depends on your program. If you read from multiple topics and join/merge them, a task gets multiple partitions (from different topics) assigned. -Matthias On 3/9/18 2:42 PM, Stas Chizhov wrote: >> Also note, that the processing order might slightly differ if you > process the same data twice > > Is this still a problem when default partition grouper is used (with 1 > partition per task)? > > Thank you, > Stanislav. > > > > 2018-03-09 3:19 GMT+01:00 Matthias J. Sax: > >> Thanks for the explanation. >> >> Not sure if setting the metadata you want to get committed in >> punctuation() would be sufficient. But I would think about it in more >> details if we get a KIP for this. >> >> It's correct that flushing and committing offsets is correlated. But >> it's not related to punctuation. >> >> Also note, that the processing order might slightly differ if you >> process the same data twice (it depends in which order the brokers >> return data on poll() and that it something Streams cannot fully >> control). Thus, you code would need to be "robust" against different >> processing orders (ie, if there are multiple input partitions, you might >> get data first for partition 0 and there for partition 1 or the other >> way round -- the order per partitions is guaranteed to be in offset order). >> >> >> -Matthias >> >> >> >> On 3/6/18 2:17 AM, Stas Chizhov wrote: >>> Thank you, Matthias! >>> >>> We currently do use kafka consumer and store event time highwatermarks as >>> offset metadata. This is used during backup procedure, which is to >> create a >>> snapshot of the target storage with all events up to certain timestamp >> and >>> no other. >>> >>> As for the API - I guess being able to provide partition-to-metadata map >> in >>> the context commit method would do it (to be called from within punctuate >>> method). BTW as far as I understand if Processor API is used flushing >>> producers and committing offsets is correlated and both output topic >> state >>> and committed offsets do correspond to a state at the moment of some >>> punctuation. Meaning that if I do have a deterministic processing >> topology >>> my output topic is going to be deterministic as well (modulo duplicates >> of >>> course). Am I correct here? >>> >>> Best regards, >>> Stanislav. >>> >>> >>> 2018-03-05 2:31 GMT+01:00 Matthias J. Sax : >>> You are correct. This is not possible atm. Note, that commits happen "under the hood" and users cannot commit explicitly. Users can only "request" as commit -- this implies that Kafka Streams will commit as soon as possible -- but when `context#commit()` returns, the commit is not done yet (it only sets a flag). What is your use case for this? How would you want to use this from an API point of view? Feel free to open a feature request JIRA -- we don't have any plans to add this atm -- it's the first time anybody asks for this feature. If there is a JIRA, maybe somebody picks it up :) -Matthias On 3/3/18 6:51 AM, Stas Chizhov wrote: > Hi, > > There seems to be no way to commit custom metadata along with offsets from > within Kafka Streams. > Are there any plans to expose this functionality or have I missed something? > > Best regards, > Stanislav. > >>> >> >> > signature.asc Description: OpenPGP digital signature
Re: committing offset metadata in kafka streams
> Also note, that the processing order might slightly differ if you process the same data twice Is this still a problem when default partition grouper is used (with 1 partition per task)? Thank you, Stanislav. 2018-03-09 3:19 GMT+01:00 Matthias J. Sax: > Thanks for the explanation. > > Not sure if setting the metadata you want to get committed in > punctuation() would be sufficient. But I would think about it in more > details if we get a KIP for this. > > It's correct that flushing and committing offsets is correlated. But > it's not related to punctuation. > > Also note, that the processing order might slightly differ if you > process the same data twice (it depends in which order the brokers > return data on poll() and that it something Streams cannot fully > control). Thus, you code would need to be "robust" against different > processing orders (ie, if there are multiple input partitions, you might > get data first for partition 0 and there for partition 1 or the other > way round -- the order per partitions is guaranteed to be in offset order). > > > -Matthias > > > > On 3/6/18 2:17 AM, Stas Chizhov wrote: > > Thank you, Matthias! > > > > We currently do use kafka consumer and store event time highwatermarks as > > offset metadata. This is used during backup procedure, which is to > create a > > snapshot of the target storage with all events up to certain timestamp > and > > no other. > > > > As for the API - I guess being able to provide partition-to-metadata map > in > > the context commit method would do it (to be called from within punctuate > > method). BTW as far as I understand if Processor API is used flushing > > producers and committing offsets is correlated and both output topic > state > > and committed offsets do correspond to a state at the moment of some > > punctuation. Meaning that if I do have a deterministic processing > topology > > my output topic is going to be deterministic as well (modulo duplicates > of > > course). Am I correct here? > > > > Best regards, > > Stanislav. > > > > > > 2018-03-05 2:31 GMT+01:00 Matthias J. Sax : > > > >> You are correct. This is not possible atm. > >> > >> Note, that commits happen "under the hood" and users cannot commit > >> explicitly. Users can only "request" as commit -- this implies that > >> Kafka Streams will commit as soon as possible -- but when > >> `context#commit()` returns, the commit is not done yet (it only sets a > >> flag). > >> > >> What is your use case for this? How would you want to use this from an > >> API point of view? > >> > >> Feel free to open a feature request JIRA -- we don't have any plans to > >> add this atm -- it's the first time anybody asks for this feature. If > >> there is a JIRA, maybe somebody picks it up :) > >> > >> > >> -Matthias > >> > >> On 3/3/18 6:51 AM, Stas Chizhov wrote: > >>> Hi, > >>> > >>> There seems to be no way to commit custom metadata along with offsets > >> from > >>> within Kafka Streams. > >>> Are there any plans to expose this functionality or have I missed > >> something? > >>> > >>> Best regards, > >>> Stanislav. > >>> > >> > >> > > > >
Re: committing offset metadata in kafka streams
Thanks for the explanation. Not sure if setting the metadata you want to get committed in punctuation() would be sufficient. But I would think about it in more details if we get a KIP for this. It's correct that flushing and committing offsets is correlated. But it's not related to punctuation. Also note, that the processing order might slightly differ if you process the same data twice (it depends in which order the brokers return data on poll() and that it something Streams cannot fully control). Thus, you code would need to be "robust" against different processing orders (ie, if there are multiple input partitions, you might get data first for partition 0 and there for partition 1 or the other way round -- the order per partitions is guaranteed to be in offset order). -Matthias On 3/6/18 2:17 AM, Stas Chizhov wrote: > Thank you, Matthias! > > We currently do use kafka consumer and store event time highwatermarks as > offset metadata. This is used during backup procedure, which is to create a > snapshot of the target storage with all events up to certain timestamp and > no other. > > As for the API - I guess being able to provide partition-to-metadata map in > the context commit method would do it (to be called from within punctuate > method). BTW as far as I understand if Processor API is used flushing > producers and committing offsets is correlated and both output topic state > and committed offsets do correspond to a state at the moment of some > punctuation. Meaning that if I do have a deterministic processing topology > my output topic is going to be deterministic as well (modulo duplicates of > course). Am I correct here? > > Best regards, > Stanislav. > > > 2018-03-05 2:31 GMT+01:00 Matthias J. Sax: > >> You are correct. This is not possible atm. >> >> Note, that commits happen "under the hood" and users cannot commit >> explicitly. Users can only "request" as commit -- this implies that >> Kafka Streams will commit as soon as possible -- but when >> `context#commit()` returns, the commit is not done yet (it only sets a >> flag). >> >> What is your use case for this? How would you want to use this from an >> API point of view? >> >> Feel free to open a feature request JIRA -- we don't have any plans to >> add this atm -- it's the first time anybody asks for this feature. If >> there is a JIRA, maybe somebody picks it up :) >> >> >> -Matthias >> >> On 3/3/18 6:51 AM, Stas Chizhov wrote: >>> Hi, >>> >>> There seems to be no way to commit custom metadata along with offsets >> from >>> within Kafka Streams. >>> Are there any plans to expose this functionality or have I missed >> something? >>> >>> Best regards, >>> Stanislav. >>> >> >> > signature.asc Description: OpenPGP digital signature
Re: committing offset metadata in kafka streams
Thank you, Matthias! We currently do use kafka consumer and store event time highwatermarks as offset metadata. This is used during backup procedure, which is to create a snapshot of the target storage with all events up to certain timestamp and no other. As for the API - I guess being able to provide partition-to-metadata map in the context commit method would do it (to be called from within punctuate method). BTW as far as I understand if Processor API is used flushing producers and committing offsets is correlated and both output topic state and committed offsets do correspond to a state at the moment of some punctuation. Meaning that if I do have a deterministic processing topology my output topic is going to be deterministic as well (modulo duplicates of course). Am I correct here? Best regards, Stanislav. 2018-03-05 2:31 GMT+01:00 Matthias J. Sax: > You are correct. This is not possible atm. > > Note, that commits happen "under the hood" and users cannot commit > explicitly. Users can only "request" as commit -- this implies that > Kafka Streams will commit as soon as possible -- but when > `context#commit()` returns, the commit is not done yet (it only sets a > flag). > > What is your use case for this? How would you want to use this from an > API point of view? > > Feel free to open a feature request JIRA -- we don't have any plans to > add this atm -- it's the first time anybody asks for this feature. If > there is a JIRA, maybe somebody picks it up :) > > > -Matthias > > On 3/3/18 6:51 AM, Stas Chizhov wrote: > > Hi, > > > > There seems to be no way to commit custom metadata along with offsets > from > > within Kafka Streams. > > Are there any plans to expose this functionality or have I missed > something? > > > > Best regards, > > Stanislav. > > > >
Re: committing offset metadata in kafka streams
You are correct. This is not possible atm. Note, that commits happen "under the hood" and users cannot commit explicitly. Users can only "request" as commit -- this implies that Kafka Streams will commit as soon as possible -- but when `context#commit()` returns, the commit is not done yet (it only sets a flag). What is your use case for this? How would you want to use this from an API point of view? Feel free to open a feature request JIRA -- we don't have any plans to add this atm -- it's the first time anybody asks for this feature. If there is a JIRA, maybe somebody picks it up :) -Matthias On 3/3/18 6:51 AM, Stas Chizhov wrote: > Hi, > > There seems to be no way to commit custom metadata along with offsets from > within Kafka Streams. > Are there any plans to expose this functionality or have I missed something? > > Best regards, > Stanislav. > signature.asc Description: OpenPGP digital signature
committing offset metadata in kafka streams
Hi, There seems to be no way to commit custom metadata along with offsets from within Kafka Streams. Are there any plans to expose this functionality or have I missed something? Best regards, Stanislav.