Re: Shouldn't KStream#through let you specify a Consumed?

2018-02-03 Thread Matthias J. Sax
With "default" I mean "FailOnInvalidTimestampExtractor" that is set as
config if you don't overwrite it. Thus, for through() the setting in
StreamsConfig is ignored if you set a custom extractor.

>> So, KStream.through() and KStream.to()-Topology.stream() are not
>> semantically equivalent? Is there anything wrong with using the latter in
>> order to specify a timestamp extractor?

That correct. `through()` forces "FailOnInvalidTimestampExtractor" while
`to()-stream()` would be workaround allowing to set a custom extractor.

It might be incorrect if you set a custom extractor. The semantics are
based on timestamps and a program should compute the same result
independent of the present of intermediate topics. Forcing
`FailOnInvalidTimestampExtractor` that extract to record metadata
timestamp ensure correctness as the timestamp of the record written and
read are the same.

If you know what you do, it's of course ok to overwrite the extractor.
But by default it should not be required (and potentially incorrect).

>> My motivation is to "fix" timestamps that result from out-of-order
>> operations until https://issues.apache.org/jira/browse/KAFKA-6454 arrives.

Sound like a hack :) Note, that KAFKA-6454 targets PAPI. We plan to use
it in DSL, but don't plan to expose to the user in the DSL. There will
be another KIP with more details... Stay tuned. I hope this will resolve
the issue you are facing atm such that you don't need to "fix"
timestamps manually any longer.


-Matthias



On 2/2/18 7:24 PM, Dmitry Minkovsky wrote:
>> it would be invalid to specify a custom extractor (for correctness
> reasons, we enforce the default timestamp extractor)
> 
> "default timestamp extractor" meaning the one that was used when reading
> the input record?
> 
> So, KStream.through() and KStream.to()-Topology.stream() are not
> semantically equivalent? Is there anything wrong with using the latter in
> order to specify a timestamp extractor?
> 
> My motivation is to "fix" timestamps that result from out-of-order
> operations until https://issues.apache.org/jira/browse/KAFKA-6454 arrives.
> 
> On Fri, Feb 2, 2018 at 7:22 PM, Matthias J. Sax 
> wrote:
> 
>> It's not required to specify Consumed:
>>
>> - the specified Serdes from Produced are used for the consumer, too
>> - it would be invalid to specify a custom extractor (for correctness
>> reasons, we enforce the default timestamp extractor)
>>
>> Note, that Streams API sets the metadata record timestamp on write, and
>> thus, using the default timestamp extractor ensures that this timestamp
>> is correctly reused when reading data back.
>>
>>
>>
>> -Matthias
>>
>> On 2/2/18 3:54 PM, Dmitry Minkovsky wrote:
>>> `KStream#through()` currently lets you specify a `Produced`. Shouldn't it
>>> also let you specify a `Consumed`. This would let you specify a time
>> stamp
>>> extractor.
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Shouldn't KStream#through let you specify a Consumed?

2018-02-02 Thread Dmitry Minkovsky
> it would be invalid to specify a custom extractor (for correctness
reasons, we enforce the default timestamp extractor)

"default timestamp extractor" meaning the one that was used when reading
the input record?

So, KStream.through() and KStream.to()-Topology.stream() are not
semantically equivalent? Is there anything wrong with using the latter in
order to specify a timestamp extractor?

My motivation is to "fix" timestamps that result from out-of-order
operations until https://issues.apache.org/jira/browse/KAFKA-6454 arrives.

On Fri, Feb 2, 2018 at 7:22 PM, Matthias J. Sax 
wrote:

> It's not required to specify Consumed:
>
> - the specified Serdes from Produced are used for the consumer, too
> - it would be invalid to specify a custom extractor (for correctness
> reasons, we enforce the default timestamp extractor)
>
> Note, that Streams API sets the metadata record timestamp on write, and
> thus, using the default timestamp extractor ensures that this timestamp
> is correctly reused when reading data back.
>
>
>
> -Matthias
>
> On 2/2/18 3:54 PM, Dmitry Minkovsky wrote:
> > `KStream#through()` currently lets you specify a `Produced`. Shouldn't it
> > also let you specify a `Consumed`. This would let you specify a time
> stamp
> > extractor.
> >
>
>


Re: Shouldn't KStream#through let you specify a Consumed?

2018-02-02 Thread Matthias J. Sax
It's not required to specify Consumed:

- the specified Serdes from Produced are used for the consumer, too
- it would be invalid to specify a custom extractor (for correctness
reasons, we enforce the default timestamp extractor)

Note, that Streams API sets the metadata record timestamp on write, and
thus, using the default timestamp extractor ensures that this timestamp
is correctly reused when reading data back.



-Matthias

On 2/2/18 3:54 PM, Dmitry Minkovsky wrote:
> `KStream#through()` currently lets you specify a `Produced`. Shouldn't it
> also let you specify a `Consumed`. This would let you specify a time stamp
> extractor.
> 



signature.asc
Description: OpenPGP digital signature