Re: committing offset metadata in kafka streams

2018-03-09 Thread Stas Chizhov
Sure. Sorry I was not clear.

Thank you!


lör 10 mars 2018 kl. 00:54 skrev Matthias J. Sax :

> If there is only one partition by task, processing order is guaranteed.
>
> For default partitions grouper, it depends on your program. If you read
> from multiple topics and join/merge them, a task gets multiple
> partitions (from different topics) assigned.
>
>
> -Matthias
>
> On 3/9/18 2:42 PM, Stas Chizhov wrote:
> >> Also note, that the processing order might slightly differ if you
> > process the same data twice 
> >
> > Is this still a problem when default partition grouper is used (with 1
> > partition per task)?
> >
> > Thank you,
> > Stanislav.
> >
> >
> >
> > 2018-03-09 3:19 GMT+01:00 Matthias J. Sax :
> >
> >> Thanks for the explanation.
> >>
> >> Not sure if setting the metadata you want to get committed in
> >> punctuation() would be sufficient. But I would think about it in more
> >> details if we get a KIP for this.
> >>
> >> It's correct that flushing and committing offsets is correlated. But
> >> it's not related to punctuation.
> >>
> >> Also note, that the processing order might slightly differ if you
> >> process the same data twice (it depends in which order the brokers
> >> return data on poll() and that it something Streams cannot fully
> >> control). Thus, you code would need to be "robust" against different
> >> processing orders (ie, if there are multiple input partitions, you might
> >> get data first for partition 0 and there for partition 1 or the other
> >> way round -- the order per partitions is guaranteed to be in offset
> order).
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 3/6/18 2:17 AM, Stas Chizhov wrote:
> >>> Thank you, Matthias!
> >>>
> >>> We currently do use kafka consumer and store event time highwatermarks
> as
> >>> offset metadata. This is used during backup procedure, which is to
> >> create a
> >>> snapshot of the target storage with all events up to certain timestamp
> >> and
> >>> no other.
> >>>
> >>> As for the API - I guess being able to provide partition-to-metadata
> map
> >> in
> >>> the context commit method would do it (to be called from within
> punctuate
> >>> method). BTW as far as I understand if Processor API is used flushing
> >>> producers and committing offsets is correlated and both output topic
> >> state
> >>> and committed offsets do correspond to a state at the moment of some
> >>> punctuation. Meaning that if I do have a deterministic processing
> >> topology
> >>> my output topic is going to be deterministic as well (modulo duplicates
> >> of
> >>> course).  Am I correct here?
> >>>
> >>> Best regards,
> >>> Stanislav.
> >>>
> >>>
> >>> 2018-03-05 2:31 GMT+01:00 Matthias J. Sax :
> >>>
>  You are correct. This is not possible atm.
> 
>  Note, that commits happen "under the hood" and users cannot commit
>  explicitly. Users can only "request" as commit -- this implies that
>  Kafka Streams will commit as soon as possible -- but when
>  `context#commit()` returns, the commit is not done yet (it only sets a
>  flag).
> 
>  What is your use case for this? How would you want to use this from an
>  API point of view?
> 
>  Feel free to open a feature request JIRA -- we don't have any plans to
>  add this atm -- it's the first time anybody asks for this feature. If
>  there is a JIRA, maybe somebody picks it up :)
> 
> 
>  -Matthias
> 
>  On 3/3/18 6:51 AM, Stas Chizhov wrote:
> > Hi,
> >
> > There seems to be no way to commit custom metadata along with offsets
>  from
> > within Kafka Streams.
> > Are there any plans to expose this functionality or have I missed
>  something?
> >
> > Best regards,
> > Stanislav.
> >
> 
> 
> >>>
> >>
> >>
> >
>
>


Re: committing offset metadata in kafka streams

2018-03-09 Thread Matthias J. Sax
If there is only one partition by task, processing order is guaranteed.

For default partitions grouper, it depends on your program. If you read
from multiple topics and join/merge them, a task gets multiple
partitions (from different topics) assigned.


-Matthias

On 3/9/18 2:42 PM, Stas Chizhov wrote:
>> Also note, that the processing order might slightly differ if you
> process the same data twice 
> 
> Is this still a problem when default partition grouper is used (with 1
> partition per task)?
> 
> Thank you,
> Stanislav.
> 
> 
> 
> 2018-03-09 3:19 GMT+01:00 Matthias J. Sax :
> 
>> Thanks for the explanation.
>>
>> Not sure if setting the metadata you want to get committed in
>> punctuation() would be sufficient. But I would think about it in more
>> details if we get a KIP for this.
>>
>> It's correct that flushing and committing offsets is correlated. But
>> it's not related to punctuation.
>>
>> Also note, that the processing order might slightly differ if you
>> process the same data twice (it depends in which order the brokers
>> return data on poll() and that it something Streams cannot fully
>> control). Thus, you code would need to be "robust" against different
>> processing orders (ie, if there are multiple input partitions, you might
>> get data first for partition 0 and there for partition 1 or the other
>> way round -- the order per partitions is guaranteed to be in offset order).
>>
>>
>> -Matthias
>>
>>
>>
>> On 3/6/18 2:17 AM, Stas Chizhov wrote:
>>> Thank you, Matthias!
>>>
>>> We currently do use kafka consumer and store event time highwatermarks as
>>> offset metadata. This is used during backup procedure, which is to
>> create a
>>> snapshot of the target storage with all events up to certain timestamp
>> and
>>> no other.
>>>
>>> As for the API - I guess being able to provide partition-to-metadata map
>> in
>>> the context commit method would do it (to be called from within punctuate
>>> method). BTW as far as I understand if Processor API is used flushing
>>> producers and committing offsets is correlated and both output topic
>> state
>>> and committed offsets do correspond to a state at the moment of some
>>> punctuation. Meaning that if I do have a deterministic processing
>> topology
>>> my output topic is going to be deterministic as well (modulo duplicates
>> of
>>> course).  Am I correct here?
>>>
>>> Best regards,
>>> Stanislav.
>>>
>>>
>>> 2018-03-05 2:31 GMT+01:00 Matthias J. Sax :
>>>
 You are correct. This is not possible atm.

 Note, that commits happen "under the hood" and users cannot commit
 explicitly. Users can only "request" as commit -- this implies that
 Kafka Streams will commit as soon as possible -- but when
 `context#commit()` returns, the commit is not done yet (it only sets a
 flag).

 What is your use case for this? How would you want to use this from an
 API point of view?

 Feel free to open a feature request JIRA -- we don't have any plans to
 add this atm -- it's the first time anybody asks for this feature. If
 there is a JIRA, maybe somebody picks it up :)


 -Matthias

 On 3/3/18 6:51 AM, Stas Chizhov wrote:
> Hi,
>
> There seems to be no way to commit custom metadata along with offsets
 from
> within Kafka Streams.
> Are there any plans to expose this functionality or have I missed
 something?
>
> Best regards,
> Stanislav.
>


>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: committing offset metadata in kafka streams

2018-03-09 Thread Stas Chizhov
> Also note, that the processing order might slightly differ if you
process the same data twice 

Is this still a problem when default partition grouper is used (with 1
partition per task)?

Thank you,
Stanislav.



2018-03-09 3:19 GMT+01:00 Matthias J. Sax :

> Thanks for the explanation.
>
> Not sure if setting the metadata you want to get committed in
> punctuation() would be sufficient. But I would think about it in more
> details if we get a KIP for this.
>
> It's correct that flushing and committing offsets is correlated. But
> it's not related to punctuation.
>
> Also note, that the processing order might slightly differ if you
> process the same data twice (it depends in which order the brokers
> return data on poll() and that it something Streams cannot fully
> control). Thus, you code would need to be "robust" against different
> processing orders (ie, if there are multiple input partitions, you might
> get data first for partition 0 and there for partition 1 or the other
> way round -- the order per partitions is guaranteed to be in offset order).
>
>
> -Matthias
>
>
>
> On 3/6/18 2:17 AM, Stas Chizhov wrote:
> > Thank you, Matthias!
> >
> > We currently do use kafka consumer and store event time highwatermarks as
> > offset metadata. This is used during backup procedure, which is to
> create a
> > snapshot of the target storage with all events up to certain timestamp
> and
> > no other.
> >
> > As for the API - I guess being able to provide partition-to-metadata map
> in
> > the context commit method would do it (to be called from within punctuate
> > method). BTW as far as I understand if Processor API is used flushing
> > producers and committing offsets is correlated and both output topic
> state
> > and committed offsets do correspond to a state at the moment of some
> > punctuation. Meaning that if I do have a deterministic processing
> topology
> > my output topic is going to be deterministic as well (modulo duplicates
> of
> > course).  Am I correct here?
> >
> > Best regards,
> > Stanislav.
> >
> >
> > 2018-03-05 2:31 GMT+01:00 Matthias J. Sax :
> >
> >> You are correct. This is not possible atm.
> >>
> >> Note, that commits happen "under the hood" and users cannot commit
> >> explicitly. Users can only "request" as commit -- this implies that
> >> Kafka Streams will commit as soon as possible -- but when
> >> `context#commit()` returns, the commit is not done yet (it only sets a
> >> flag).
> >>
> >> What is your use case for this? How would you want to use this from an
> >> API point of view?
> >>
> >> Feel free to open a feature request JIRA -- we don't have any plans to
> >> add this atm -- it's the first time anybody asks for this feature. If
> >> there is a JIRA, maybe somebody picks it up :)
> >>
> >>
> >> -Matthias
> >>
> >> On 3/3/18 6:51 AM, Stas Chizhov wrote:
> >>> Hi,
> >>>
> >>> There seems to be no way to commit custom metadata along with offsets
> >> from
> >>> within Kafka Streams.
> >>> Are there any plans to expose this functionality or have I missed
> >> something?
> >>>
> >>> Best regards,
> >>> Stanislav.
> >>>
> >>
> >>
> >
>
>


Re: committing offset metadata in kafka streams

2018-03-08 Thread Matthias J. Sax
Thanks for the explanation.

Not sure if setting the metadata you want to get committed in
punctuation() would be sufficient. But I would think about it in more
details if we get a KIP for this.

It's correct that flushing and committing offsets is correlated. But
it's not related to punctuation.

Also note, that the processing order might slightly differ if you
process the same data twice (it depends in which order the brokers
return data on poll() and that it something Streams cannot fully
control). Thus, you code would need to be "robust" against different
processing orders (ie, if there are multiple input partitions, you might
get data first for partition 0 and there for partition 1 or the other
way round -- the order per partitions is guaranteed to be in offset order).


-Matthias



On 3/6/18 2:17 AM, Stas Chizhov wrote:
> Thank you, Matthias!
> 
> We currently do use kafka consumer and store event time highwatermarks as
> offset metadata. This is used during backup procedure, which is to create a
> snapshot of the target storage with all events up to certain timestamp and
> no other.
> 
> As for the API - I guess being able to provide partition-to-metadata map in
> the context commit method would do it (to be called from within punctuate
> method). BTW as far as I understand if Processor API is used flushing
> producers and committing offsets is correlated and both output topic state
> and committed offsets do correspond to a state at the moment of some
> punctuation. Meaning that if I do have a deterministic processing topology
> my output topic is going to be deterministic as well (modulo duplicates of
> course).  Am I correct here?
> 
> Best regards,
> Stanislav.
> 
> 
> 2018-03-05 2:31 GMT+01:00 Matthias J. Sax :
> 
>> You are correct. This is not possible atm.
>>
>> Note, that commits happen "under the hood" and users cannot commit
>> explicitly. Users can only "request" as commit -- this implies that
>> Kafka Streams will commit as soon as possible -- but when
>> `context#commit()` returns, the commit is not done yet (it only sets a
>> flag).
>>
>> What is your use case for this? How would you want to use this from an
>> API point of view?
>>
>> Feel free to open a feature request JIRA -- we don't have any plans to
>> add this atm -- it's the first time anybody asks for this feature. If
>> there is a JIRA, maybe somebody picks it up :)
>>
>>
>> -Matthias
>>
>> On 3/3/18 6:51 AM, Stas Chizhov wrote:
>>> Hi,
>>>
>>> There seems to be no way to commit custom metadata along with offsets
>> from
>>> within Kafka Streams.
>>> Are there any plans to expose this functionality or have I missed
>> something?
>>>
>>> Best regards,
>>> Stanislav.
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: committing offset metadata in kafka streams

2018-03-06 Thread Stas Chizhov
Thank you, Matthias!

We currently do use kafka consumer and store event time highwatermarks as
offset metadata. This is used during backup procedure, which is to create a
snapshot of the target storage with all events up to certain timestamp and
no other.

As for the API - I guess being able to provide partition-to-metadata map in
the context commit method would do it (to be called from within punctuate
method). BTW as far as I understand if Processor API is used flushing
producers and committing offsets is correlated and both output topic state
and committed offsets do correspond to a state at the moment of some
punctuation. Meaning that if I do have a deterministic processing topology
my output topic is going to be deterministic as well (modulo duplicates of
course).  Am I correct here?

Best regards,
Stanislav.


2018-03-05 2:31 GMT+01:00 Matthias J. Sax :

> You are correct. This is not possible atm.
>
> Note, that commits happen "under the hood" and users cannot commit
> explicitly. Users can only "request" as commit -- this implies that
> Kafka Streams will commit as soon as possible -- but when
> `context#commit()` returns, the commit is not done yet (it only sets a
> flag).
>
> What is your use case for this? How would you want to use this from an
> API point of view?
>
> Feel free to open a feature request JIRA -- we don't have any plans to
> add this atm -- it's the first time anybody asks for this feature. If
> there is a JIRA, maybe somebody picks it up :)
>
>
> -Matthias
>
> On 3/3/18 6:51 AM, Stas Chizhov wrote:
> > Hi,
> >
> > There seems to be no way to commit custom metadata along with offsets
> from
> > within Kafka Streams.
> > Are there any plans to expose this functionality or have I missed
> something?
> >
> > Best regards,
> > Stanislav.
> >
>
>


Re: committing offset metadata in kafka streams

2018-03-04 Thread Matthias J. Sax
You are correct. This is not possible atm.

Note, that commits happen "under the hood" and users cannot commit
explicitly. Users can only "request" as commit -- this implies that
Kafka Streams will commit as soon as possible -- but when
`context#commit()` returns, the commit is not done yet (it only sets a
flag).

What is your use case for this? How would you want to use this from an
API point of view?

Feel free to open a feature request JIRA -- we don't have any plans to
add this atm -- it's the first time anybody asks for this feature. If
there is a JIRA, maybe somebody picks it up :)


-Matthias

On 3/3/18 6:51 AM, Stas Chizhov wrote:
> Hi,
> 
> There seems to be no way to commit custom metadata along with offsets from
> within Kafka Streams.
> Are there any plans to expose this functionality or have I missed something?
> 
> Best regards,
> Stanislav.
> 



signature.asc
Description: OpenPGP digital signature


committing offset metadata in kafka streams

2018-03-03 Thread Stas Chizhov
Hi,

There seems to be no way to commit custom metadata along with offsets from
within Kafka Streams.
Are there any plans to expose this functionality or have I missed something?

Best regards,
Stanislav.