Good morning,

we have simple use-case where we want to count number of events by each
hour grouped by some fields from event itself.

Our event timestamp is embedded into messages itself (json) and we using
trivial custom timestamp extractor (which called and works as expected).

What we facing is that there is always timestamp used that coming
from ExtractRecordMetadataTimestamp when determining matching windows for
event, inside KStreamWindowAggregate.process() and never value from our
json timestamp extractor.

Effectively it doesn't work correctly if we test on late data, e.g.
timestamp in a message hour ago from now for instance. Topology always
calculating matching hour bucket (window) using record timestamp, not
payload.

Is it expected behaviour ? Are we getting windowing wrong? Any settings or
other tricks to accommodate our use-case?

For reference our setup: brokers, kafka-stream and kafka-clients all of
v1.0.0
And here is code:

KTable<Windowed<Tuple>, Long> summaries = in
   .groupBy((key, value) -> ......)
   .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
   .count();

Thank you.

Reply via email to