Re: kafka-streams, late data, tumbling windows aggregations and event time
Thanks for creating the JIRA ticket. Streams library follows "event-time" concept by default with the metadata timestamp extractor, expecting the timestamp set in this field reflects "when the event happens in real-time": https://kafka.apache.org/10/documentation/streams/core-concepts#streams_time Following that expectation, the timestamps Streams used for windowed aggregation results is the window start time, indicating "events happened during this window in real-time resulted in this aggregated value". Guozhang On Tue, Mar 6, 2018 at 6:39 AM, Dmitriy Vsekhvalnovwrote: > Guozhang, > > here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614 > > i'd also like to continue discussion little bit further about timestamps. > Was trying to test with broker configured "CreateTime" and got question > about sink topic timestamps, back to example: > > KTable summaries = in >.groupBy((key, value) -> ..) >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) >.count(); > > summaries.toStream().to(sink); > > Each record written to sink will get timestamp assigned to grouping window > start time, which quite often will be in the past. > > What the logic behind that? Honestly was expected sink messages to get > "now" timestamp. > > > On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang wrote: > > > Sounds great! :) > > > > On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov < > > dvsekhval...@gmail.com > > > wrote: > > > > > Thanks, that's an option, i'll take a look at configuration. > > > > > > But yeah, i was thinking same, if streams relies on the fact that > > internal > > > topics should use 'CreateTime' configuration, then it is streams > library > > > responsibility to configure it. > > > > > > I can open a Jira ticket :) > > > > > > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang > > wrote: > > > > > > > Hello Dmitriy, > > > > > > > > In your case, you can override this config to CreateTime only for the > > > > internal topics created by Streams, this is documented in > > > > > > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/ > > > > streams/StreamsConfig.html#TOPIC_PREFIX > > > > > > > > > > > > We are also discussing to always override the > > log.message.timestamp.type > > > > config for internal topics to CreateTime, I vaguely remember there > is a > > > > JIRA open for it in case you are interested in contributing to > Streams > > > > library. > > > > > > > > Guozhang > > > > > > > > > > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov < > > > > dvsekhval...@gmail.com > > > > > wrote: > > > > > > > > > Which effectively means given scenario is not working with > > > LogAppendTime, > > > > > correct? Because all internal re-partition topics will always > contain > > > > "now" > > > > > instead of real timestamp from original payload message? > > > > > > > > > > Is kafka-streams designed to work with LogAppendTime at all? It > > seems a > > > > lot > > > > > of stuff will NOT work correctly using > > > > > built-in ExtractRecordMetadataTimestamp ? > > > > > > > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang > > > > wrote: > > > > > > > > > > > If broker configures log.message.timestamp.type=LogAppendTime > > > > > universally, > > > > > > it will ignore whatever timestamp set in the message metadata and > > > > > override > > > > > > it with the append time. So when the messages are fetched by > > > downstream > > > > > > processors which always use the metadata timestamp extractor, it > > will > > > > get > > > > > > the append timestamp set by brokers. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > > > > > > dvsekhval...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > > > interesting, will same logic applies (internal topic rewrite) > for > > > > > brokers > > > > > > > configured with: > > > > > > > log.message.timestamp.type=LogAppendTime > > > > > > > > > > > > > > ? > > > > > > > > > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang < > > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hello Dmitriy, > > > > > > > > > > > > > > > > What you have observed is by design, and it maybe a bit > > confusing > > > > at > > > > > > > first > > > > > > > > place. Let me explain: > > > > > > > > > > > > > > > > When you do a group-by aggregation like the above case, > during > > > the > > > > > > > > "groupBy((key, > > > > > > > > value) -> ..)" stage Streams library will do a > > > re-partitioning > > > > by > > > > > > > > sending the original data stream into an internal repartition > > > topic > > > > > > based > > > > > > > > on the aggregation key defined in the "groupBy" function and > > > fetch > > > > > from > > > > > > > > that topic again. This is similar to a shuffle phase in > > >
Re: kafka-streams, late data, tumbling windows aggregations and event time
Guozhang, here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614 i'd also like to continue discussion little bit further about timestamps. Was trying to test with broker configured "CreateTime" and got question about sink topic timestamps, back to example: KTablesummaries = in .groupBy((key, value) -> ..) .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) .count(); summaries.toStream().to(sink); Each record written to sink will get timestamp assigned to grouping window start time, which quite often will be in the past. What the logic behind that? Honestly was expected sink messages to get "now" timestamp. On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang wrote: > Sounds great! :) > > On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com > > wrote: > > > Thanks, that's an option, i'll take a look at configuration. > > > > But yeah, i was thinking same, if streams relies on the fact that > internal > > topics should use 'CreateTime' configuration, then it is streams library > > responsibility to configure it. > > > > I can open a Jira ticket :) > > > > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang > wrote: > > > > > Hello Dmitriy, > > > > > > In your case, you can override this config to CreateTime only for the > > > internal topics created by Streams, this is documented in > > > > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/ > > > streams/StreamsConfig.html#TOPIC_PREFIX > > > > > > > > > We are also discussing to always override the > log.message.timestamp.type > > > config for internal topics to CreateTime, I vaguely remember there is a > > > JIRA open for it in case you are interested in contributing to Streams > > > library. > > > > > > Guozhang > > > > > > > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov < > > > dvsekhval...@gmail.com > > > > wrote: > > > > > > > Which effectively means given scenario is not working with > > LogAppendTime, > > > > correct? Because all internal re-partition topics will always contain > > > "now" > > > > instead of real timestamp from original payload message? > > > > > > > > Is kafka-streams designed to work with LogAppendTime at all? It > seems a > > > lot > > > > of stuff will NOT work correctly using > > > > built-in ExtractRecordMetadataTimestamp ? > > > > > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang > > > wrote: > > > > > > > > > If broker configures log.message.timestamp.type=LogAppendTime > > > > universally, > > > > > it will ignore whatever timestamp set in the message metadata and > > > > override > > > > > it with the append time. So when the messages are fetched by > > downstream > > > > > processors which always use the metadata timestamp extractor, it > will > > > get > > > > > the append timestamp set by brokers. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > > > > > dvsekhval...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > interesting, will same logic applies (internal topic rewrite) for > > > > brokers > > > > > > configured with: > > > > > > log.message.timestamp.type=LogAppendTime > > > > > > > > > > > > ? > > > > > > > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hello Dmitriy, > > > > > > > > > > > > > > What you have observed is by design, and it maybe a bit > confusing > > > at > > > > > > first > > > > > > > place. Let me explain: > > > > > > > > > > > > > > When you do a group-by aggregation like the above case, during > > the > > > > > > > "groupBy((key, > > > > > > > value) -> ..)" stage Streams library will do a > > re-partitioning > > > by > > > > > > > sending the original data stream into an internal repartition > > topic > > > > > based > > > > > > > on the aggregation key defined in the "groupBy" function and > > fetch > > > > from > > > > > > > that topic again. This is similar to a shuffle phase in > > distributed > > > > > > > computing frameworks to make sure the down stream aggregations > > can > > > be > > > > > > done > > > > > > > in parallel. When the "groupBy" operator sends the messages to > > this > > > > > > > repartition topic, it will set in the record metadata the > > extracted > > > > > > > timestamp from the payload, and hence for the downstream > > > aggregation > > > > > > > operator to read from this repartition topic, it is OK to > always > > > use > > > > > > > the ExtractRecordMetadataTimestamp > > > > > > > to extract that timestamp and use the extracted value to > > determine > > > > > which > > > > > > > window this record should fall into. > > > > > > > > > > > > > > More details can be found in this JIRA: > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > > > > > > > > > > > > > > > > So the record
Re: kafka-streams, late data, tumbling windows aggregations and event time
Sounds great! :) On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnovwrote: > Thanks, that's an option, i'll take a look at configuration. > > But yeah, i was thinking same, if streams relies on the fact that internal > topics should use 'CreateTime' configuration, then it is streams library > responsibility to configure it. > > I can open a Jira ticket :) > > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang wrote: > > > Hello Dmitriy, > > > > In your case, you can override this config to CreateTime only for the > > internal topics created by Streams, this is documented in > > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/ > > streams/StreamsConfig.html#TOPIC_PREFIX > > > > > > We are also discussing to always override the log.message.timestamp.type > > config for internal topics to CreateTime, I vaguely remember there is a > > JIRA open for it in case you are interested in contributing to Streams > > library. > > > > Guozhang > > > > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov < > > dvsekhval...@gmail.com > > > wrote: > > > > > Which effectively means given scenario is not working with > LogAppendTime, > > > correct? Because all internal re-partition topics will always contain > > "now" > > > instead of real timestamp from original payload message? > > > > > > Is kafka-streams designed to work with LogAppendTime at all? It seems a > > lot > > > of stuff will NOT work correctly using > > > built-in ExtractRecordMetadataTimestamp ? > > > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang > > wrote: > > > > > > > If broker configures log.message.timestamp.type=LogAppendTime > > > universally, > > > > it will ignore whatever timestamp set in the message metadata and > > > override > > > > it with the append time. So when the messages are fetched by > downstream > > > > processors which always use the metadata timestamp extractor, it will > > get > > > > the append timestamp set by brokers. > > > > > > > > > > > > Guozhang > > > > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > > > > dvsekhval...@gmail.com> > > > > wrote: > > > > > > > > > Hi Guozhang, > > > > > > > > > > interesting, will same logic applies (internal topic rewrite) for > > > brokers > > > > > configured with: > > > > > log.message.timestamp.type=LogAppendTime > > > > > > > > > > ? > > > > > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang > > > > wrote: > > > > > > > > > > > Hello Dmitriy, > > > > > > > > > > > > What you have observed is by design, and it maybe a bit confusing > > at > > > > > first > > > > > > place. Let me explain: > > > > > > > > > > > > When you do a group-by aggregation like the above case, during > the > > > > > > "groupBy((key, > > > > > > value) -> ..)" stage Streams library will do a > re-partitioning > > by > > > > > > sending the original data stream into an internal repartition > topic > > > > based > > > > > > on the aggregation key defined in the "groupBy" function and > fetch > > > from > > > > > > that topic again. This is similar to a shuffle phase in > distributed > > > > > > computing frameworks to make sure the down stream aggregations > can > > be > > > > > done > > > > > > in parallel. When the "groupBy" operator sends the messages to > this > > > > > > repartition topic, it will set in the record metadata the > extracted > > > > > > timestamp from the payload, and hence for the downstream > > aggregation > > > > > > operator to read from this repartition topic, it is OK to always > > use > > > > > > the ExtractRecordMetadataTimestamp > > > > > > to extract that timestamp and use the extracted value to > determine > > > > which > > > > > > window this record should fall into. > > > > > > > > > > > > More details can be found in this JIRA: > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > > > > > > > > > > > > > So the record timestamp used during aggregation should be the > same > > as > > > > the > > > > > > one in the payload, if you do observe that is not the case, this > is > > > > > > unexpected. In that case could you share your complete code > > snippet, > > > > > > especially how input stream "in" is defined, and your config > > > properties > > > > > > defined for us to investigate? > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > > > > > dvsekhval...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Good morning, > > > > > > > > > > > > > > we have simple use-case where we want to count number of events > > by > > > > each > > > > > > > hour grouped by some fields from event itself. > > > > > > > > > > > > > > Our event timestamp is embedded into messages itself (json) and > > we > > > > > using > > > > > > > trivial custom timestamp extractor (which called and works as > > > > > expected). > > > > > > > > > > > > > > What we facing
Re: kafka-streams, late data, tumbling windows aggregations and event time
Thanks, that's an option, i'll take a look at configuration. But yeah, i was thinking same, if streams relies on the fact that internal topics should use 'CreateTime' configuration, then it is streams library responsibility to configure it. I can open a Jira ticket :) On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wangwrote: > Hello Dmitriy, > > In your case, you can override this config to CreateTime only for the > internal topics created by Streams, this is documented in > > https://kafka.apache.org/10/javadoc/org/apache/kafka/ > streams/StreamsConfig.html#TOPIC_PREFIX > > > We are also discussing to always override the log.message.timestamp.type > config for internal topics to CreateTime, I vaguely remember there is a > JIRA open for it in case you are interested in contributing to Streams > library. > > Guozhang > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com > > wrote: > > > Which effectively means given scenario is not working with LogAppendTime, > > correct? Because all internal re-partition topics will always contain > "now" > > instead of real timestamp from original payload message? > > > > Is kafka-streams designed to work with LogAppendTime at all? It seems a > lot > > of stuff will NOT work correctly using > > built-in ExtractRecordMetadataTimestamp ? > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang > wrote: > > > > > If broker configures log.message.timestamp.type=LogAppendTime > > universally, > > > it will ignore whatever timestamp set in the message metadata and > > override > > > it with the append time. So when the messages are fetched by downstream > > > processors which always use the metadata timestamp extractor, it will > get > > > the append timestamp set by brokers. > > > > > > > > > Guozhang > > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > > > dvsekhval...@gmail.com> > > > wrote: > > > > > > > Hi Guozhang, > > > > > > > > interesting, will same logic applies (internal topic rewrite) for > > brokers > > > > configured with: > > > > log.message.timestamp.type=LogAppendTime > > > > > > > > ? > > > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang > > > wrote: > > > > > > > > > Hello Dmitriy, > > > > > > > > > > What you have observed is by design, and it maybe a bit confusing > at > > > > first > > > > > place. Let me explain: > > > > > > > > > > When you do a group-by aggregation like the above case, during the > > > > > "groupBy((key, > > > > > value) -> ..)" stage Streams library will do a re-partitioning > by > > > > > sending the original data stream into an internal repartition topic > > > based > > > > > on the aggregation key defined in the "groupBy" function and fetch > > from > > > > > that topic again. This is similar to a shuffle phase in distributed > > > > > computing frameworks to make sure the down stream aggregations can > be > > > > done > > > > > in parallel. When the "groupBy" operator sends the messages to this > > > > > repartition topic, it will set in the record metadata the extracted > > > > > timestamp from the payload, and hence for the downstream > aggregation > > > > > operator to read from this repartition topic, it is OK to always > use > > > > > the ExtractRecordMetadataTimestamp > > > > > to extract that timestamp and use the extracted value to determine > > > which > > > > > window this record should fall into. > > > > > > > > > > More details can be found in this JIRA: > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > > > > > > > > > > So the record timestamp used during aggregation should be the same > as > > > the > > > > > one in the payload, if you do observe that is not the case, this is > > > > > unexpected. In that case could you share your complete code > snippet, > > > > > especially how input stream "in" is defined, and your config > > properties > > > > > defined for us to investigate? > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > > > > dvsekhval...@gmail.com> > > > > > wrote: > > > > > > > > > > > Good morning, > > > > > > > > > > > > we have simple use-case where we want to count number of events > by > > > each > > > > > > hour grouped by some fields from event itself. > > > > > > > > > > > > Our event timestamp is embedded into messages itself (json) and > we > > > > using > > > > > > trivial custom timestamp extractor (which called and works as > > > > expected). > > > > > > > > > > > > What we facing is that there is always timestamp used that coming > > > > > > from ExtractRecordMetadataTimestamp when determining matching > > windows > > > > for > > > > > > event, inside KStreamWindowAggregate.process() and never value > > from > > > > our > > > > > > json timestamp extractor. > > > > > > > > > > > > Effectively it doesn't work correctly if we test on late data, > e.g. > > > > > > timestamp in a
Re: kafka-streams, late data, tumbling windows aggregations and event time
Hello Dmitriy, In your case, you can override this config to CreateTime only for the internal topics created by Streams, this is documented in https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsConfig.html#TOPIC_PREFIX We are also discussing to always override the log.message.timestamp.type config for internal topics to CreateTime, I vaguely remember there is a JIRA open for it in case you are interested in contributing to Streams library. Guozhang On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnovwrote: > Which effectively means given scenario is not working with LogAppendTime, > correct? Because all internal re-partition topics will always contain "now" > instead of real timestamp from original payload message? > > Is kafka-streams designed to work with LogAppendTime at all? It seems a lot > of stuff will NOT work correctly using > built-in ExtractRecordMetadataTimestamp ? > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang wrote: > > > If broker configures log.message.timestamp.type=LogAppendTime > universally, > > it will ignore whatever timestamp set in the message metadata and > override > > it with the append time. So when the messages are fetched by downstream > > processors which always use the metadata timestamp extractor, it will get > > the append timestamp set by brokers. > > > > > > Guozhang > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > > dvsekhval...@gmail.com> > > wrote: > > > > > Hi Guozhang, > > > > > > interesting, will same logic applies (internal topic rewrite) for > brokers > > > configured with: > > > log.message.timestamp.type=LogAppendTime > > > > > > ? > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang > > wrote: > > > > > > > Hello Dmitriy, > > > > > > > > What you have observed is by design, and it maybe a bit confusing at > > > first > > > > place. Let me explain: > > > > > > > > When you do a group-by aggregation like the above case, during the > > > > "groupBy((key, > > > > value) -> ..)" stage Streams library will do a re-partitioning by > > > > sending the original data stream into an internal repartition topic > > based > > > > on the aggregation key defined in the "groupBy" function and fetch > from > > > > that topic again. This is similar to a shuffle phase in distributed > > > > computing frameworks to make sure the down stream aggregations can be > > > done > > > > in parallel. When the "groupBy" operator sends the messages to this > > > > repartition topic, it will set in the record metadata the extracted > > > > timestamp from the payload, and hence for the downstream aggregation > > > > operator to read from this repartition topic, it is OK to always use > > > > the ExtractRecordMetadataTimestamp > > > > to extract that timestamp and use the extracted value to determine > > which > > > > window this record should fall into. > > > > > > > > More details can be found in this JIRA: > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > > > > > > > So the record timestamp used during aggregation should be the same as > > the > > > > one in the payload, if you do observe that is not the case, this is > > > > unexpected. In that case could you share your complete code snippet, > > > > especially how input stream "in" is defined, and your config > properties > > > > defined for us to investigate? > > > > > > > > Guozhang > > > > > > > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > > > dvsekhval...@gmail.com> > > > > wrote: > > > > > > > > > Good morning, > > > > > > > > > > we have simple use-case where we want to count number of events by > > each > > > > > hour grouped by some fields from event itself. > > > > > > > > > > Our event timestamp is embedded into messages itself (json) and we > > > using > > > > > trivial custom timestamp extractor (which called and works as > > > expected). > > > > > > > > > > What we facing is that there is always timestamp used that coming > > > > > from ExtractRecordMetadataTimestamp when determining matching > windows > > > for > > > > > event, inside KStreamWindowAggregate.process() and never value > from > > > our > > > > > json timestamp extractor. > > > > > > > > > > Effectively it doesn't work correctly if we test on late data, e.g. > > > > > timestamp in a message hour ago from now for instance. Topology > > always > > > > > calculating matching hour bucket (window) using record timestamp, > not > > > > > payload. > > > > > > > > > > Is it expected behaviour ? Are we getting windowing wrong? Any > > settings > > > > or > > > > > other tricks to accommodate our use-case? > > > > > > > > > > For reference our setup: brokers, kafka-stream and kafka-clients > all > > of > > > > > v1.0.0 > > > > > And here is code: > > > > > > > > > > KTable summaries = in > > > > >.groupBy((key, value) -> ..) > > > > >
Re: kafka-streams, late data, tumbling windows aggregations and event time
Which effectively means given scenario is not working with LogAppendTime, correct? Because all internal re-partition topics will always contain "now" instead of real timestamp from original payload message? Is kafka-streams designed to work with LogAppendTime at all? It seems a lot of stuff will NOT work correctly using built-in ExtractRecordMetadataTimestamp ? On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wangwrote: > If broker configures log.message.timestamp.type=LogAppendTime universally, > it will ignore whatever timestamp set in the message metadata and override > it with the append time. So when the messages are fetched by downstream > processors which always use the metadata timestamp extractor, it will get > the append timestamp set by brokers. > > > Guozhang > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com> > wrote: > > > Hi Guozhang, > > > > interesting, will same logic applies (internal topic rewrite) for brokers > > configured with: > > log.message.timestamp.type=LogAppendTime > > > > ? > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang > wrote: > > > > > Hello Dmitriy, > > > > > > What you have observed is by design, and it maybe a bit confusing at > > first > > > place. Let me explain: > > > > > > When you do a group-by aggregation like the above case, during the > > > "groupBy((key, > > > value) -> ..)" stage Streams library will do a re-partitioning by > > > sending the original data stream into an internal repartition topic > based > > > on the aggregation key defined in the "groupBy" function and fetch from > > > that topic again. This is similar to a shuffle phase in distributed > > > computing frameworks to make sure the down stream aggregations can be > > done > > > in parallel. When the "groupBy" operator sends the messages to this > > > repartition topic, it will set in the record metadata the extracted > > > timestamp from the payload, and hence for the downstream aggregation > > > operator to read from this repartition topic, it is OK to always use > > > the ExtractRecordMetadataTimestamp > > > to extract that timestamp and use the extracted value to determine > which > > > window this record should fall into. > > > > > > More details can be found in this JIRA: > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > > > > So the record timestamp used during aggregation should be the same as > the > > > one in the payload, if you do observe that is not the case, this is > > > unexpected. In that case could you share your complete code snippet, > > > especially how input stream "in" is defined, and your config properties > > > defined for us to investigate? > > > > > > Guozhang > > > > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > > dvsekhval...@gmail.com> > > > wrote: > > > > > > > Good morning, > > > > > > > > we have simple use-case where we want to count number of events by > each > > > > hour grouped by some fields from event itself. > > > > > > > > Our event timestamp is embedded into messages itself (json) and we > > using > > > > trivial custom timestamp extractor (which called and works as > > expected). > > > > > > > > What we facing is that there is always timestamp used that coming > > > > from ExtractRecordMetadataTimestamp when determining matching windows > > for > > > > event, inside KStreamWindowAggregate.process() and never value from > > our > > > > json timestamp extractor. > > > > > > > > Effectively it doesn't work correctly if we test on late data, e.g. > > > > timestamp in a message hour ago from now for instance. Topology > always > > > > calculating matching hour bucket (window) using record timestamp, not > > > > payload. > > > > > > > > Is it expected behaviour ? Are we getting windowing wrong? Any > settings > > > or > > > > other tricks to accommodate our use-case? > > > > > > > > For reference our setup: brokers, kafka-stream and kafka-clients all > of > > > > v1.0.0 > > > > And here is code: > > > > > > > > KTable summaries = in > > > >.groupBy((key, value) -> ..) > > > >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > > > >.count(); > > > > > > > > Thank you. > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >
Re: kafka-streams, late data, tumbling windows aggregations and event time
If broker configures log.message.timestamp.type=LogAppendTime universally, it will ignore whatever timestamp set in the message metadata and override it with the append time. So when the messages are fetched by downstream processors which always use the metadata timestamp extractor, it will get the append timestamp set by brokers. Guozhang On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnovwrote: > Hi Guozhang, > > interesting, will same logic applies (internal topic rewrite) for brokers > configured with: > log.message.timestamp.type=LogAppendTime > > ? > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang wrote: > > > Hello Dmitriy, > > > > What you have observed is by design, and it maybe a bit confusing at > first > > place. Let me explain: > > > > When you do a group-by aggregation like the above case, during the > > "groupBy((key, > > value) -> ..)" stage Streams library will do a re-partitioning by > > sending the original data stream into an internal repartition topic based > > on the aggregation key defined in the "groupBy" function and fetch from > > that topic again. This is similar to a shuffle phase in distributed > > computing frameworks to make sure the down stream aggregations can be > done > > in parallel. When the "groupBy" operator sends the messages to this > > repartition topic, it will set in the record metadata the extracted > > timestamp from the payload, and hence for the downstream aggregation > > operator to read from this repartition topic, it is OK to always use > > the ExtractRecordMetadataTimestamp > > to extract that timestamp and use the extracted value to determine which > > window this record should fall into. > > > > More details can be found in this JIRA: > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > So the record timestamp used during aggregation should be the same as the > > one in the payload, if you do observe that is not the case, this is > > unexpected. In that case could you share your complete code snippet, > > especially how input stream "in" is defined, and your config properties > > defined for us to investigate? > > > > Guozhang > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > dvsekhval...@gmail.com> > > wrote: > > > > > Good morning, > > > > > > we have simple use-case where we want to count number of events by each > > > hour grouped by some fields from event itself. > > > > > > Our event timestamp is embedded into messages itself (json) and we > using > > > trivial custom timestamp extractor (which called and works as > expected). > > > > > > What we facing is that there is always timestamp used that coming > > > from ExtractRecordMetadataTimestamp when determining matching windows > for > > > event, inside KStreamWindowAggregate.process() and never value from > our > > > json timestamp extractor. > > > > > > Effectively it doesn't work correctly if we test on late data, e.g. > > > timestamp in a message hour ago from now for instance. Topology always > > > calculating matching hour bucket (window) using record timestamp, not > > > payload. > > > > > > Is it expected behaviour ? Are we getting windowing wrong? Any settings > > or > > > other tricks to accommodate our use-case? > > > > > > For reference our setup: brokers, kafka-stream and kafka-clients all of > > > v1.0.0 > > > And here is code: > > > > > > KTable summaries = in > > >.groupBy((key, value) -> ..) > > >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > > >.count(); > > > > > > Thank you. > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang
Re: kafka-streams, late data, tumbling windows aggregations and event time
Hi Guozhang, interesting, will same logic applies (internal topic rewrite) for brokers configured with: log.message.timestamp.type=LogAppendTime ? On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wangwrote: > Hello Dmitriy, > > What you have observed is by design, and it maybe a bit confusing at first > place. Let me explain: > > When you do a group-by aggregation like the above case, during the > "groupBy((key, > value) -> ..)" stage Streams library will do a re-partitioning by > sending the original data stream into an internal repartition topic based > on the aggregation key defined in the "groupBy" function and fetch from > that topic again. This is similar to a shuffle phase in distributed > computing frameworks to make sure the down stream aggregations can be done > in parallel. When the "groupBy" operator sends the messages to this > repartition topic, it will set in the record metadata the extracted > timestamp from the payload, and hence for the downstream aggregation > operator to read from this repartition topic, it is OK to always use > the ExtractRecordMetadataTimestamp > to extract that timestamp and use the extracted value to determine which > window this record should fall into. > > More details can be found in this JIRA: > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > So the record timestamp used during aggregation should be the same as the > one in the payload, if you do observe that is not the case, this is > unexpected. In that case could you share your complete code snippet, > especially how input stream "in" is defined, and your config properties > defined for us to investigate? > > Guozhang > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com> > wrote: > > > Good morning, > > > > we have simple use-case where we want to count number of events by each > > hour grouped by some fields from event itself. > > > > Our event timestamp is embedded into messages itself (json) and we using > > trivial custom timestamp extractor (which called and works as expected). > > > > What we facing is that there is always timestamp used that coming > > from ExtractRecordMetadataTimestamp when determining matching windows for > > event, inside KStreamWindowAggregate.process() and never value from our > > json timestamp extractor. > > > > Effectively it doesn't work correctly if we test on late data, e.g. > > timestamp in a message hour ago from now for instance. Topology always > > calculating matching hour bucket (window) using record timestamp, not > > payload. > > > > Is it expected behaviour ? Are we getting windowing wrong? Any settings > or > > other tricks to accommodate our use-case? > > > > For reference our setup: brokers, kafka-stream and kafka-clients all of > > v1.0.0 > > And here is code: > > > > KTable summaries = in > >.groupBy((key, value) -> ..) > >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > >.count(); > > > > Thank you. > > > > > > -- > -- Guozhang >
Re: kafka-streams, late data, tumbling windows aggregations and event time
Hello Dmitriy, What you have observed is by design, and it maybe a bit confusing at first place. Let me explain: When you do a group-by aggregation like the above case, during the "groupBy((key, value) -> ..)" stage Streams library will do a re-partitioning by sending the original data stream into an internal repartition topic based on the aggregation key defined in the "groupBy" function and fetch from that topic again. This is similar to a shuffle phase in distributed computing frameworks to make sure the down stream aggregations can be done in parallel. When the "groupBy" operator sends the messages to this repartition topic, it will set in the record metadata the extracted timestamp from the payload, and hence for the downstream aggregation operator to read from this repartition topic, it is OK to always use the ExtractRecordMetadataTimestamp to extract that timestamp and use the extracted value to determine which window this record should fall into. More details can be found in this JIRA: https://issues.apache.org/jira/browse/KAFKA-4785 So the record timestamp used during aggregation should be the same as the one in the payload, if you do observe that is not the case, this is unexpected. In that case could you share your complete code snippet, especially how input stream "in" is defined, and your config properties defined for us to investigate? Guozhang On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnovwrote: > Good morning, > > we have simple use-case where we want to count number of events by each > hour grouped by some fields from event itself. > > Our event timestamp is embedded into messages itself (json) and we using > trivial custom timestamp extractor (which called and works as expected). > > What we facing is that there is always timestamp used that coming > from ExtractRecordMetadataTimestamp when determining matching windows for > event, inside KStreamWindowAggregate.process() and never value from our > json timestamp extractor. > > Effectively it doesn't work correctly if we test on late data, e.g. > timestamp in a message hour ago from now for instance. Topology always > calculating matching hour bucket (window) using record timestamp, not > payload. > > Is it expected behaviour ? Are we getting windowing wrong? Any settings or > other tricks to accommodate our use-case? > > For reference our setup: brokers, kafka-stream and kafka-clients all of > v1.0.0 > And here is code: > > KTable summaries = in >.groupBy((key, value) -> ..) >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) >.count(); > > Thank you. > -- -- Guozhang
kafka-streams, late data, tumbling windows aggregations and event time
Good morning, we have simple use-case where we want to count number of events by each hour grouped by some fields from event itself. Our event timestamp is embedded into messages itself (json) and we using trivial custom timestamp extractor (which called and works as expected). What we facing is that there is always timestamp used that coming from ExtractRecordMetadataTimestamp when determining matching windows for event, inside KStreamWindowAggregate.process() and never value from our json timestamp extractor. Effectively it doesn't work correctly if we test on late data, e.g. timestamp in a message hour ago from now for instance. Topology always calculating matching hour bucket (window) using record timestamp, not payload. Is it expected behaviour ? Are we getting windowing wrong? Any settings or other tricks to accommodate our use-case? For reference our setup: brokers, kafka-stream and kafka-clients all of v1.0.0 And here is code: KTablesummaries = in .groupBy((key, value) -> ..) .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) .count(); Thank you.