Thanks for the explanation.

Not sure if setting the metadata you want to get committed in
punctuation() would be sufficient. But I would think about it in more
details if we get a KIP for this.

It's correct that flushing and committing offsets is correlated. But
it's not related to punctuation.

Also note, that the processing order might slightly differ if you
process the same data twice (it depends in which order the brokers
return data on poll() and that it something Streams cannot fully
control). Thus, you code would need to be "robust" against different
processing orders (ie, if there are multiple input partitions, you might
get data first for partition 0 and there for partition 1 or the other
way round -- the order per partitions is guaranteed to be in offset order).


-Matthias



On 3/6/18 2:17 AM, Stas Chizhov wrote:
> Thank you, Matthias!
> 
> We currently do use kafka consumer and store event time highwatermarks as
> offset metadata. This is used during backup procedure, which is to create a
> snapshot of the target storage with all events up to certain timestamp and
> no other.
> 
> As for the API - I guess being able to provide partition-to-metadata map in
> the context commit method would do it (to be called from within punctuate
> method). BTW as far as I understand if Processor API is used flushing
> producers and committing offsets is correlated and both output topic state
> and committed offsets do correspond to a state at the moment of some
> punctuation. Meaning that if I do have a deterministic processing topology
> my output topic is going to be deterministic as well (modulo duplicates of
> course).  Am I correct here?
> 
> Best regards,
> Stanislav.
> 
> 
> 2018-03-05 2:31 GMT+01:00 Matthias J. Sax <matth...@confluent.io>:
> 
>> You are correct. This is not possible atm.
>>
>> Note, that commits happen "under the hood" and users cannot commit
>> explicitly. Users can only "request" as commit -- this implies that
>> Kafka Streams will commit as soon as possible -- but when
>> `context#commit()` returns, the commit is not done yet (it only sets a
>> flag).
>>
>> What is your use case for this? How would you want to use this from an
>> API point of view?
>>
>> Feel free to open a feature request JIRA -- we don't have any plans to
>> add this atm -- it's the first time anybody asks for this feature. If
>> there is a JIRA, maybe somebody picks it up :)
>>
>>
>> -Matthias
>>
>> On 3/3/18 6:51 AM, Stas Chizhov wrote:
>>> Hi,
>>>
>>> There seems to be no way to commit custom metadata along with offsets
>> from
>>> within Kafka Streams.
>>> Are there any plans to expose this functionality or have I missed
>> something?
>>>
>>> Best regards,
>>> Stanislav.
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to