Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-06 Thread Guozhang Wang
Thanks for creating the JIRA ticket.

Streams library follows "event-time" concept by default with the metadata
timestamp extractor, expecting the timestamp set in this field reflects
"when the event happens in real-time":

https://kafka.apache.org/10/documentation/streams/core-concepts#streams_time

Following that expectation, the timestamps Streams used for windowed
aggregation results is the window start time, indicating "events
happened during
this window in real-time resulted in this aggregated value".


Guozhang


On Tue, Mar 6, 2018 at 6:39 AM, Dmitriy Vsekhvalnov 
wrote:

> Guozhang,
>
> here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614
>
> i'd also like to continue discussion little bit further about timestamps.
> Was trying to test with broker configured "CreateTime" and got question
> about sink topic timestamps, back to example:
>
> KTable summaries = in
>.groupBy((key, value) -> ..)
>.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>.count();
>
> summaries.toStream().to(sink);
>
> Each record written to sink will get timestamp assigned to grouping window
> start time, which quite often will be in the past.
>
> What the logic behind that? Honestly was expected sink messages to get
> "now" timestamp.
>
>
> On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang  wrote:
>
> > Sounds great! :)
> >
> > On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com
> > > wrote:
> >
> > > Thanks, that's an option, i'll take a look at configuration.
> > >
> > > But yeah, i was thinking same, if streams relies on the fact that
> > internal
> > > topics should use 'CreateTime' configuration, then it is streams
> library
> > > responsibility to configure it.
> > >
> > > I can open a Jira ticket :)
> > >
> > > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Dmitriy,
> > > >
> > > > In your case, you can override this config to CreateTime only for the
> > > > internal topics created by Streams, this is documented in
> > > >
> > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > > > streams/StreamsConfig.html#TOPIC_PREFIX
> > > >
> > > >
> > > > We are also discussing to always override the
> > log.message.timestamp.type
> > > > config for internal topics to CreateTime, I vaguely remember there
> is a
> > > > JIRA open for it in case you are interested in contributing to
> Streams
> > > > library.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhval...@gmail.com
> > > > > wrote:
> > > >
> > > > > Which effectively means given scenario is not working with
> > > LogAppendTime,
> > > > > correct? Because all internal re-partition topics will always
> contain
> > > > "now"
> > > > > instead of real timestamp from original payload message?
> > > > >
> > > > > Is kafka-streams designed to work with LogAppendTime at all? It
> > seems a
> > > > lot
> > > > > of stuff will NOT work correctly using
> > > > > built-in ExtractRecordMetadataTimestamp ?
> > > > >
> > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > > > universally,
> > > > > > it will ignore whatever timestamp set in the message metadata and
> > > > > override
> > > > > > it with the append time. So when the messages are fetched by
> > > downstream
> > > > > > processors which always use the metadata timestamp extractor, it
> > will
> > > > get
> > > > > > the append timestamp set by brokers.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > > > dvsekhval...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Guozhang,
> > > > > > >
> > > > > > > interesting, will same logic applies (internal topic rewrite)
> for
> > > > > brokers
> > > > > > > configured with:
> > > > > > >   log.message.timestamp.type=LogAppendTime
> > > > > > >
> > > > > > > ?
> > > > > > >
> > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Dmitriy,
> > > > > > > >
> > > > > > > > What you have observed is by design, and it maybe a bit
> > confusing
> > > > at
> > > > > > > first
> > > > > > > > place. Let me explain:
> > > > > > > >
> > > > > > > > When you do a group-by aggregation like the above case,
> during
> > > the
> > > > > > > > "groupBy((key,
> > > > > > > > value) -> ..)" stage Streams library will do a
> > > re-partitioning
> > > > by
> > > > > > > > sending the original data stream into an internal repartition
> > > topic
> > > > > > based
> > > > > > > > on the aggregation key defined in the "groupBy" function and
> > > fetch
> > > > > from
> > > > > > > > that topic again. This is similar to a shuffle phase in
> > > 

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-06 Thread Dmitriy Vsekhvalnov
Guozhang,

here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614

i'd also like to continue discussion little bit further about timestamps.
Was trying to test with broker configured "CreateTime" and got question
about sink topic timestamps, back to example:

KTable summaries = in
   .groupBy((key, value) -> ..)
   .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
   .count();

summaries.toStream().to(sink);

Each record written to sink will get timestamp assigned to grouping window
start time, which quite often will be in the past.

What the logic behind that? Honestly was expected sink messages to get
"now" timestamp.


On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang  wrote:

> Sounds great! :)
>
> On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com
> > wrote:
>
> > Thanks, that's an option, i'll take a look at configuration.
> >
> > But yeah, i was thinking same, if streams relies on the fact that
> internal
> > topics should use 'CreateTime' configuration, then it is streams library
> > responsibility to configure it.
> >
> > I can open a Jira ticket :)
> >
> > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Dmitriy,
> > >
> > > In your case, you can override this config to CreateTime only for the
> > > internal topics created by Streams, this is documented in
> > >
> > > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > > streams/StreamsConfig.html#TOPIC_PREFIX
> > >
> > >
> > > We are also discussing to always override the
> log.message.timestamp.type
> > > config for internal topics to CreateTime, I vaguely remember there is a
> > > JIRA open for it in case you are interested in contributing to Streams
> > > library.
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > > dvsekhval...@gmail.com
> > > > wrote:
> > >
> > > > Which effectively means given scenario is not working with
> > LogAppendTime,
> > > > correct? Because all internal re-partition topics will always contain
> > > "now"
> > > > instead of real timestamp from original payload message?
> > > >
> > > > Is kafka-streams designed to work with LogAppendTime at all? It
> seems a
> > > lot
> > > > of stuff will NOT work correctly using
> > > > built-in ExtractRecordMetadataTimestamp ?
> > > >
> > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > > universally,
> > > > > it will ignore whatever timestamp set in the message metadata and
> > > > override
> > > > > it with the append time. So when the messages are fetched by
> > downstream
> > > > > processors which always use the metadata timestamp extractor, it
> will
> > > get
> > > > > the append timestamp set by brokers.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > > dvsekhval...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > interesting, will same logic applies (internal topic rewrite) for
> > > > brokers
> > > > > > configured with:
> > > > > >   log.message.timestamp.type=LogAppendTime
> > > > > >
> > > > > > ?
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello Dmitriy,
> > > > > > >
> > > > > > > What you have observed is by design, and it maybe a bit
> confusing
> > > at
> > > > > > first
> > > > > > > place. Let me explain:
> > > > > > >
> > > > > > > When you do a group-by aggregation like the above case, during
> > the
> > > > > > > "groupBy((key,
> > > > > > > value) -> ..)" stage Streams library will do a
> > re-partitioning
> > > by
> > > > > > > sending the original data stream into an internal repartition
> > topic
> > > > > based
> > > > > > > on the aggregation key defined in the "groupBy" function and
> > fetch
> > > > from
> > > > > > > that topic again. This is similar to a shuffle phase in
> > distributed
> > > > > > > computing frameworks to make sure the down stream aggregations
> > can
> > > be
> > > > > > done
> > > > > > > in parallel. When the "groupBy" operator sends the messages to
> > this
> > > > > > > repartition topic, it will set in the record metadata the
> > extracted
> > > > > > > timestamp from the payload, and hence for the downstream
> > > aggregation
> > > > > > > operator to read from this repartition topic, it is OK to
> always
> > > use
> > > > > > > the ExtractRecordMetadataTimestamp
> > > > > > > to extract that timestamp and use the extracted value to
> > determine
> > > > > which
> > > > > > > window this record should fall into.
> > > > > > >
> > > > > > > More details can be found in this JIRA:
> > > > > > >
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > > > >
> > > > > > >
> > > > > > > So the record 

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Sounds great! :)

On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov  wrote:

> Thanks, that's an option, i'll take a look at configuration.
>
> But yeah, i was thinking same, if streams relies on the fact that internal
> topics should use 'CreateTime' configuration, then it is streams library
> responsibility to configure it.
>
> I can open a Jira ticket :)
>
> On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang  wrote:
>
> > Hello Dmitriy,
> >
> > In your case, you can override this config to CreateTime only for the
> > internal topics created by Streams, this is documented in
> >
> > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > streams/StreamsConfig.html#TOPIC_PREFIX
> >
> >
> > We are also discussing to always override the log.message.timestamp.type
> > config for internal topics to CreateTime, I vaguely remember there is a
> > JIRA open for it in case you are interested in contributing to Streams
> > library.
> >
> > Guozhang
> >
> >
> > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com
> > > wrote:
> >
> > > Which effectively means given scenario is not working with
> LogAppendTime,
> > > correct? Because all internal re-partition topics will always contain
> > "now"
> > > instead of real timestamp from original payload message?
> > >
> > > Is kafka-streams designed to work with LogAppendTime at all? It seems a
> > lot
> > > of stuff will NOT work correctly using
> > > built-in ExtractRecordMetadataTimestamp ?
> > >
> > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang 
> > wrote:
> > >
> > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > universally,
> > > > it will ignore whatever timestamp set in the message metadata and
> > > override
> > > > it with the append time. So when the messages are fetched by
> downstream
> > > > processors which always use the metadata timestamp extractor, it will
> > get
> > > > the append timestamp set by brokers.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > interesting, will same logic applies (internal topic rewrite) for
> > > brokers
> > > > > configured with:
> > > > >   log.message.timestamp.type=LogAppendTime
> > > > >
> > > > > ?
> > > > >
> > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > Hello Dmitriy,
> > > > > >
> > > > > > What you have observed is by design, and it maybe a bit confusing
> > at
> > > > > first
> > > > > > place. Let me explain:
> > > > > >
> > > > > > When you do a group-by aggregation like the above case, during
> the
> > > > > > "groupBy((key,
> > > > > > value) -> ..)" stage Streams library will do a
> re-partitioning
> > by
> > > > > > sending the original data stream into an internal repartition
> topic
> > > > based
> > > > > > on the aggregation key defined in the "groupBy" function and
> fetch
> > > from
> > > > > > that topic again. This is similar to a shuffle phase in
> distributed
> > > > > > computing frameworks to make sure the down stream aggregations
> can
> > be
> > > > > done
> > > > > > in parallel. When the "groupBy" operator sends the messages to
> this
> > > > > > repartition topic, it will set in the record metadata the
> extracted
> > > > > > timestamp from the payload, and hence for the downstream
> > aggregation
> > > > > > operator to read from this repartition topic, it is OK to always
> > use
> > > > > > the ExtractRecordMetadataTimestamp
> > > > > > to extract that timestamp and use the extracted value to
> determine
> > > > which
> > > > > > window this record should fall into.
> > > > > >
> > > > > > More details can be found in this JIRA:
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > > >
> > > > > >
> > > > > > So the record timestamp used during aggregation should be the
> same
> > as
> > > > the
> > > > > > one in the payload, if you do observe that is not the case, this
> is
> > > > > > unexpected. In that case could you share your complete code
> > snippet,
> > > > > > especially how input stream "in" is defined, and your config
> > > properties
> > > > > > defined for us to investigate?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > > > dvsekhval...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Good morning,
> > > > > > >
> > > > > > > we have simple use-case where we want to count number of events
> > by
> > > > each
> > > > > > > hour grouped by some fields from event itself.
> > > > > > >
> > > > > > > Our event timestamp is embedded into messages itself (json) and
> > we
> > > > > using
> > > > > > > trivial custom timestamp extractor (which called and works as
> > > > > expected).
> > > > > > >
> > > > > > > What we facing 

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Thanks, that's an option, i'll take a look at configuration.

But yeah, i was thinking same, if streams relies on the fact that internal
topics should use 'CreateTime' configuration, then it is streams library
responsibility to configure it.

I can open a Jira ticket :)

On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang  wrote:

> Hello Dmitriy,
>
> In your case, you can override this config to CreateTime only for the
> internal topics created by Streams, this is documented in
>
> https://kafka.apache.org/10/javadoc/org/apache/kafka/
> streams/StreamsConfig.html#TOPIC_PREFIX
>
>
> We are also discussing to always override the log.message.timestamp.type
> config for internal topics to CreateTime, I vaguely remember there is a
> JIRA open for it in case you are interested in contributing to Streams
> library.
>
> Guozhang
>
>
> On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com
> > wrote:
>
> > Which effectively means given scenario is not working with LogAppendTime,
> > correct? Because all internal re-partition topics will always contain
> "now"
> > instead of real timestamp from original payload message?
> >
> > Is kafka-streams designed to work with LogAppendTime at all? It seems a
> lot
> > of stuff will NOT work correctly using
> > built-in ExtractRecordMetadataTimestamp ?
> >
> > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang 
> wrote:
> >
> > > If broker configures log.message.timestamp.type=LogAppendTime
> > universally,
> > > it will ignore whatever timestamp set in the message metadata and
> > override
> > > it with the append time. So when the messages are fetched by downstream
> > > processors which always use the metadata timestamp extractor, it will
> get
> > > the append timestamp set by brokers.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > dvsekhval...@gmail.com>
> > > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > interesting, will same logic applies (internal topic rewrite) for
> > brokers
> > > > configured with:
> > > >   log.message.timestamp.type=LogAppendTime
> > > >
> > > > ?
> > > >
> > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hello Dmitriy,
> > > > >
> > > > > What you have observed is by design, and it maybe a bit confusing
> at
> > > > first
> > > > > place. Let me explain:
> > > > >
> > > > > When you do a group-by aggregation like the above case, during the
> > > > > "groupBy((key,
> > > > > value) -> ..)" stage Streams library will do a re-partitioning
> by
> > > > > sending the original data stream into an internal repartition topic
> > > based
> > > > > on the aggregation key defined in the "groupBy" function and fetch
> > from
> > > > > that topic again. This is similar to a shuffle phase in distributed
> > > > > computing frameworks to make sure the down stream aggregations can
> be
> > > > done
> > > > > in parallel. When the "groupBy" operator sends the messages to this
> > > > > repartition topic, it will set in the record metadata the extracted
> > > > > timestamp from the payload, and hence for the downstream
> aggregation
> > > > > operator to read from this repartition topic, it is OK to always
> use
> > > > > the ExtractRecordMetadataTimestamp
> > > > > to extract that timestamp and use the extracted value to determine
> > > which
> > > > > window this record should fall into.
> > > > >
> > > > > More details can be found in this JIRA:
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > >
> > > > >
> > > > > So the record timestamp used during aggregation should be the same
> as
> > > the
> > > > > one in the payload, if you do observe that is not the case, this is
> > > > > unexpected. In that case could you share your complete code
> snippet,
> > > > > especially how input stream "in" is defined, and your config
> > properties
> > > > > defined for us to investigate?
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > > dvsekhval...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Good morning,
> > > > > >
> > > > > > we have simple use-case where we want to count number of events
> by
> > > each
> > > > > > hour grouped by some fields from event itself.
> > > > > >
> > > > > > Our event timestamp is embedded into messages itself (json) and
> we
> > > > using
> > > > > > trivial custom timestamp extractor (which called and works as
> > > > expected).
> > > > > >
> > > > > > What we facing is that there is always timestamp used that coming
> > > > > > from ExtractRecordMetadataTimestamp when determining matching
> > windows
> > > > for
> > > > > > event, inside KStreamWindowAggregate.process() and never value
> > from
> > > > our
> > > > > > json timestamp extractor.
> > > > > >
> > > > > > Effectively it doesn't work correctly if we test on late data,
> e.g.
> > > > > > timestamp in a 

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Hello Dmitriy,

In your case, you can override this config to CreateTime only for the
internal topics created by Streams, this is documented in

https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsConfig.html#TOPIC_PREFIX


We are also discussing to always override the log.message.timestamp.type
config for internal topics to CreateTime, I vaguely remember there is a
JIRA open for it in case you are interested in contributing to Streams
library.

Guozhang


On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov  wrote:

> Which effectively means given scenario is not working with LogAppendTime,
> correct? Because all internal re-partition topics will always contain "now"
> instead of real timestamp from original payload message?
>
> Is kafka-streams designed to work with LogAppendTime at all? It seems a lot
> of stuff will NOT work correctly using
> built-in ExtractRecordMetadataTimestamp ?
>
> On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang  wrote:
>
> > If broker configures log.message.timestamp.type=LogAppendTime
> universally,
> > it will ignore whatever timestamp set in the message metadata and
> override
> > it with the append time. So when the messages are fetched by downstream
> > processors which always use the metadata timestamp extractor, it will get
> > the append timestamp set by brokers.
> >
> >
> > Guozhang
> >
> > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > interesting, will same logic applies (internal topic rewrite) for
> brokers
> > > configured with:
> > >   log.message.timestamp.type=LogAppendTime
> > >
> > > ?
> > >
> > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Dmitriy,
> > > >
> > > > What you have observed is by design, and it maybe a bit confusing at
> > > first
> > > > place. Let me explain:
> > > >
> > > > When you do a group-by aggregation like the above case, during the
> > > > "groupBy((key,
> > > > value) -> ..)" stage Streams library will do a re-partitioning by
> > > > sending the original data stream into an internal repartition topic
> > based
> > > > on the aggregation key defined in the "groupBy" function and fetch
> from
> > > > that topic again. This is similar to a shuffle phase in distributed
> > > > computing frameworks to make sure the down stream aggregations can be
> > > done
> > > > in parallel. When the "groupBy" operator sends the messages to this
> > > > repartition topic, it will set in the record metadata the extracted
> > > > timestamp from the payload, and hence for the downstream aggregation
> > > > operator to read from this repartition topic, it is OK to always use
> > > > the ExtractRecordMetadataTimestamp
> > > > to extract that timestamp and use the extracted value to determine
> > which
> > > > window this record should fall into.
> > > >
> > > > More details can be found in this JIRA:
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > >
> > > >
> > > > So the record timestamp used during aggregation should be the same as
> > the
> > > > one in the payload, if you do observe that is not the case, this is
> > > > unexpected. In that case could you share your complete code snippet,
> > > > especially how input stream "in" is defined, and your config
> properties
> > > > defined for us to investigate?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Good morning,
> > > > >
> > > > > we have simple use-case where we want to count number of events by
> > each
> > > > > hour grouped by some fields from event itself.
> > > > >
> > > > > Our event timestamp is embedded into messages itself (json) and we
> > > using
> > > > > trivial custom timestamp extractor (which called and works as
> > > expected).
> > > > >
> > > > > What we facing is that there is always timestamp used that coming
> > > > > from ExtractRecordMetadataTimestamp when determining matching
> windows
> > > for
> > > > > event, inside KStreamWindowAggregate.process() and never value
> from
> > > our
> > > > > json timestamp extractor.
> > > > >
> > > > > Effectively it doesn't work correctly if we test on late data, e.g.
> > > > > timestamp in a message hour ago from now for instance. Topology
> > always
> > > > > calculating matching hour bucket (window) using record timestamp,
> not
> > > > > payload.
> > > > >
> > > > > Is it expected behaviour ? Are we getting windowing wrong? Any
> > settings
> > > > or
> > > > > other tricks to accommodate our use-case?
> > > > >
> > > > > For reference our setup: brokers, kafka-stream and kafka-clients
> all
> > of
> > > > > v1.0.0
> > > > > And here is code:
> > > > >
> > > > > KTable summaries = in
> > > > >.groupBy((key, value) -> ..)
> > > > >

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Which effectively means given scenario is not working with LogAppendTime,
correct? Because all internal re-partition topics will always contain "now"
instead of real timestamp from original payload message?

Is kafka-streams designed to work with LogAppendTime at all? It seems a lot
of stuff will NOT work correctly using
built-in ExtractRecordMetadataTimestamp ?

On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang  wrote:

> If broker configures log.message.timestamp.type=LogAppendTime universally,
> it will ignore whatever timestamp set in the message metadata and override
> it with the append time. So when the messages are fetched by downstream
> processors which always use the metadata timestamp extractor, it will get
> the append timestamp set by brokers.
>
>
> Guozhang
>
> On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > interesting, will same logic applies (internal topic rewrite) for brokers
> > configured with:
> >   log.message.timestamp.type=LogAppendTime
> >
> > ?
> >
> > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Dmitriy,
> > >
> > > What you have observed is by design, and it maybe a bit confusing at
> > first
> > > place. Let me explain:
> > >
> > > When you do a group-by aggregation like the above case, during the
> > > "groupBy((key,
> > > value) -> ..)" stage Streams library will do a re-partitioning by
> > > sending the original data stream into an internal repartition topic
> based
> > > on the aggregation key defined in the "groupBy" function and fetch from
> > > that topic again. This is similar to a shuffle phase in distributed
> > > computing frameworks to make sure the down stream aggregations can be
> > done
> > > in parallel. When the "groupBy" operator sends the messages to this
> > > repartition topic, it will set in the record metadata the extracted
> > > timestamp from the payload, and hence for the downstream aggregation
> > > operator to read from this repartition topic, it is OK to always use
> > > the ExtractRecordMetadataTimestamp
> > > to extract that timestamp and use the extracted value to determine
> which
> > > window this record should fall into.
> > >
> > > More details can be found in this JIRA:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-4785
> > >
> > >
> > > So the record timestamp used during aggregation should be the same as
> the
> > > one in the payload, if you do observe that is not the case, this is
> > > unexpected. In that case could you share your complete code snippet,
> > > especially how input stream "in" is defined, and your config properties
> > > defined for us to investigate?
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > dvsekhval...@gmail.com>
> > > wrote:
> > >
> > > > Good morning,
> > > >
> > > > we have simple use-case where we want to count number of events by
> each
> > > > hour grouped by some fields from event itself.
> > > >
> > > > Our event timestamp is embedded into messages itself (json) and we
> > using
> > > > trivial custom timestamp extractor (which called and works as
> > expected).
> > > >
> > > > What we facing is that there is always timestamp used that coming
> > > > from ExtractRecordMetadataTimestamp when determining matching windows
> > for
> > > > event, inside KStreamWindowAggregate.process() and never value from
> > our
> > > > json timestamp extractor.
> > > >
> > > > Effectively it doesn't work correctly if we test on late data, e.g.
> > > > timestamp in a message hour ago from now for instance. Topology
> always
> > > > calculating matching hour bucket (window) using record timestamp, not
> > > > payload.
> > > >
> > > > Is it expected behaviour ? Are we getting windowing wrong? Any
> settings
> > > or
> > > > other tricks to accommodate our use-case?
> > > >
> > > > For reference our setup: brokers, kafka-stream and kafka-clients all
> of
> > > > v1.0.0
> > > > And here is code:
> > > >
> > > > KTable summaries = in
> > > >.groupBy((key, value) -> ..)
> > > >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > > >.count();
> > > >
> > > > Thank you.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
If broker configures log.message.timestamp.type=LogAppendTime universally,
it will ignore whatever timestamp set in the message metadata and override
it with the append time. So when the messages are fetched by downstream
processors which always use the metadata timestamp extractor, it will get
the append timestamp set by brokers.


Guozhang

On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov 
wrote:

> Hi Guozhang,
>
> interesting, will same logic applies (internal topic rewrite) for brokers
> configured with:
>   log.message.timestamp.type=LogAppendTime
>
> ?
>
> On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang  wrote:
>
> > Hello Dmitriy,
> >
> > What you have observed is by design, and it maybe a bit confusing at
> first
> > place. Let me explain:
> >
> > When you do a group-by aggregation like the above case, during the
> > "groupBy((key,
> > value) -> ..)" stage Streams library will do a re-partitioning by
> > sending the original data stream into an internal repartition topic based
> > on the aggregation key defined in the "groupBy" function and fetch from
> > that topic again. This is similar to a shuffle phase in distributed
> > computing frameworks to make sure the down stream aggregations can be
> done
> > in parallel. When the "groupBy" operator sends the messages to this
> > repartition topic, it will set in the record metadata the extracted
> > timestamp from the payload, and hence for the downstream aggregation
> > operator to read from this repartition topic, it is OK to always use
> > the ExtractRecordMetadataTimestamp
> > to extract that timestamp and use the extracted value to determine which
> > window this record should fall into.
> >
> > More details can be found in this JIRA:
> >
> > https://issues.apache.org/jira/browse/KAFKA-4785
> >
> >
> > So the record timestamp used during aggregation should be the same as the
> > one in the payload, if you do observe that is not the case, this is
> > unexpected. In that case could you share your complete code snippet,
> > especially how input stream "in" is defined, and your config properties
> > defined for us to investigate?
> >
> > Guozhang
> >
> >
> > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com>
> > wrote:
> >
> > > Good morning,
> > >
> > > we have simple use-case where we want to count number of events by each
> > > hour grouped by some fields from event itself.
> > >
> > > Our event timestamp is embedded into messages itself (json) and we
> using
> > > trivial custom timestamp extractor (which called and works as
> expected).
> > >
> > > What we facing is that there is always timestamp used that coming
> > > from ExtractRecordMetadataTimestamp when determining matching windows
> for
> > > event, inside KStreamWindowAggregate.process() and never value from
> our
> > > json timestamp extractor.
> > >
> > > Effectively it doesn't work correctly if we test on late data, e.g.
> > > timestamp in a message hour ago from now for instance. Topology always
> > > calculating matching hour bucket (window) using record timestamp, not
> > > payload.
> > >
> > > Is it expected behaviour ? Are we getting windowing wrong? Any settings
> > or
> > > other tricks to accommodate our use-case?
> > >
> > > For reference our setup: brokers, kafka-stream and kafka-clients all of
> > > v1.0.0
> > > And here is code:
> > >
> > > KTable summaries = in
> > >.groupBy((key, value) -> ..)
> > >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > >.count();
> > >
> > > Thank you.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Hi Guozhang,

interesting, will same logic applies (internal topic rewrite) for brokers
configured with:
  log.message.timestamp.type=LogAppendTime

?

On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang  wrote:

> Hello Dmitriy,
>
> What you have observed is by design, and it maybe a bit confusing at first
> place. Let me explain:
>
> When you do a group-by aggregation like the above case, during the
> "groupBy((key,
> value) -> ..)" stage Streams library will do a re-partitioning by
> sending the original data stream into an internal repartition topic based
> on the aggregation key defined in the "groupBy" function and fetch from
> that topic again. This is similar to a shuffle phase in distributed
> computing frameworks to make sure the down stream aggregations can be done
> in parallel. When the "groupBy" operator sends the messages to this
> repartition topic, it will set in the record metadata the extracted
> timestamp from the payload, and hence for the downstream aggregation
> operator to read from this repartition topic, it is OK to always use
> the ExtractRecordMetadataTimestamp
> to extract that timestamp and use the extracted value to determine which
> window this record should fall into.
>
> More details can be found in this JIRA:
>
> https://issues.apache.org/jira/browse/KAFKA-4785
>
>
> So the record timestamp used during aggregation should be the same as the
> one in the payload, if you do observe that is not the case, this is
> unexpected. In that case could you share your complete code snippet,
> especially how input stream "in" is defined, and your config properties
> defined for us to investigate?
>
> Guozhang
>
>
> On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Good morning,
> >
> > we have simple use-case where we want to count number of events by each
> > hour grouped by some fields from event itself.
> >
> > Our event timestamp is embedded into messages itself (json) and we using
> > trivial custom timestamp extractor (which called and works as expected).
> >
> > What we facing is that there is always timestamp used that coming
> > from ExtractRecordMetadataTimestamp when determining matching windows for
> > event, inside KStreamWindowAggregate.process() and never value from our
> > json timestamp extractor.
> >
> > Effectively it doesn't work correctly if we test on late data, e.g.
> > timestamp in a message hour ago from now for instance. Topology always
> > calculating matching hour bucket (window) using record timestamp, not
> > payload.
> >
> > Is it expected behaviour ? Are we getting windowing wrong? Any settings
> or
> > other tricks to accommodate our use-case?
> >
> > For reference our setup: brokers, kafka-stream and kafka-clients all of
> > v1.0.0
> > And here is code:
> >
> > KTable summaries = in
> >.groupBy((key, value) -> ..)
> >.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> >.count();
> >
> > Thank you.
> >
>
>
>
> --
> -- Guozhang
>


Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Guozhang Wang
Hello Dmitriy,

What you have observed is by design, and it maybe a bit confusing at first
place. Let me explain:

When you do a group-by aggregation like the above case, during the
"groupBy((key,
value) -> ..)" stage Streams library will do a re-partitioning by
sending the original data stream into an internal repartition topic based
on the aggregation key defined in the "groupBy" function and fetch from
that topic again. This is similar to a shuffle phase in distributed
computing frameworks to make sure the down stream aggregations can be done
in parallel. When the "groupBy" operator sends the messages to this
repartition topic, it will set in the record metadata the extracted
timestamp from the payload, and hence for the downstream aggregation
operator to read from this repartition topic, it is OK to always use
the ExtractRecordMetadataTimestamp
to extract that timestamp and use the extracted value to determine which
window this record should fall into.

More details can be found in this JIRA:

https://issues.apache.org/jira/browse/KAFKA-4785


So the record timestamp used during aggregation should be the same as the
one in the payload, if you do observe that is not the case, this is
unexpected. In that case could you share your complete code snippet,
especially how input stream "in" is defined, and your config properties
defined for us to investigate?

Guozhang


On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov 
wrote:

> Good morning,
>
> we have simple use-case where we want to count number of events by each
> hour grouped by some fields from event itself.
>
> Our event timestamp is embedded into messages itself (json) and we using
> trivial custom timestamp extractor (which called and works as expected).
>
> What we facing is that there is always timestamp used that coming
> from ExtractRecordMetadataTimestamp when determining matching windows for
> event, inside KStreamWindowAggregate.process() and never value from our
> json timestamp extractor.
>
> Effectively it doesn't work correctly if we test on late data, e.g.
> timestamp in a message hour ago from now for instance. Topology always
> calculating matching hour bucket (window) using record timestamp, not
> payload.
>
> Is it expected behaviour ? Are we getting windowing wrong? Any settings or
> other tricks to accommodate our use-case?
>
> For reference our setup: brokers, kafka-stream and kafka-clients all of
> v1.0.0
> And here is code:
>
> KTable summaries = in
>.groupBy((key, value) -> ..)
>.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>.count();
>
> Thank you.
>



-- 
-- Guozhang


kafka-streams, late data, tumbling windows aggregations and event time

2018-03-05 Thread Dmitriy Vsekhvalnov
Good morning,

we have simple use-case where we want to count number of events by each
hour grouped by some fields from event itself.

Our event timestamp is embedded into messages itself (json) and we using
trivial custom timestamp extractor (which called and works as expected).

What we facing is that there is always timestamp used that coming
from ExtractRecordMetadataTimestamp when determining matching windows for
event, inside KStreamWindowAggregate.process() and never value from our
json timestamp extractor.

Effectively it doesn't work correctly if we test on late data, e.g.
timestamp in a message hour ago from now for instance. Topology always
calculating matching hour bucket (window) using record timestamp, not
payload.

Is it expected behaviour ? Are we getting windowing wrong? Any settings or
other tricks to accommodate our use-case?

For reference our setup: brokers, kafka-stream and kafka-clients all of
v1.0.0
And here is code:

KTable summaries = in
   .groupBy((key, value) -> ..)
   .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
   .count();

Thank you.