Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-06 Thread Guozhang Wang
Thanks for creating the JIRA ticket. Streams library follows "event-time" concept by default with the metadata timestamp extractor, expecting the timestamp set in this field reflects "when the event happens in real-time":

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-06 Thread Dmitriy Vsekhvalnov
Guozhang, here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614 i'd also like to continue discussion little bit further about timestamps. Was trying to test with broker configured "CreateTime" and got question about sink topic timestamps, back to example: KTable

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Sounds great! :) On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov wrote: > Thanks, that's an option, i'll take a look at configuration. > > But yeah, i was thinking same, if streams relies on the fact that internal > topics should use 'CreateTime' configuration,

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Thanks, that's an option, i'll take a look at configuration. But yeah, i was thinking same, if streams relies on the fact that internal topics should use 'CreateTime' configuration, then it is streams library responsibility to configure it. I can open a Jira ticket :) On Mon, Mar 5, 2018 at

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Hello Dmitriy, In your case, you can override this config to CreateTime only for the internal topics created by Streams, this is documented in https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsConfig.html#TOPIC_PREFIX We are also discussing to always override the

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Which effectively means given scenario is not working with LogAppendTime, correct? Because all internal re-partition topics will always contain "now" instead of real timestamp from original payload message? Is kafka-streams designed to work with LogAppendTime at all? It seems a lot of stuff will

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
If broker configures log.message.timestamp.type=LogAppendTime universally, it will ignore whatever timestamp set in the message metadata and override it with the append time. So when the messages are fetched by downstream processors which always use the metadata timestamp extractor, it will get

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Hi Guozhang, interesting, will same logic applies (internal topic rewrite) for brokers configured with: log.message.timestamp.type=LogAppendTime ? On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang wrote: > Hello Dmitriy, > > What you have observed is by design, and it maybe

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Hello Dmitriy, What you have observed is by design, and it maybe a bit confusing at first place. Let me explain: When you do a group-by aggregation like the above case, during the "groupBy((key, value) -> ..)" stage Streams library will do a re-partitioning by sending the original data

kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Good morning, we have simple use-case where we want to count number of events by each hour grouped by some fields from event itself. Our event timestamp is embedded into messages itself (json) and we using trivial custom timestamp extractor (which called and works as expected). What we facing