Good morning, we have simple use-case where we want to count number of events by each hour grouped by some fields from event itself.
Our event timestamp is embedded into messages itself (json) and we using trivial custom timestamp extractor (which called and works as expected). What we facing is that there is always timestamp used that coming from ExtractRecordMetadataTimestamp when determining matching windows for event, inside KStreamWindowAggregate.process() and never value from our json timestamp extractor. Effectively it doesn't work correctly if we test on late data, e.g. timestamp in a message hour ago from now for instance. Topology always calculating matching hour bucket (window) using record timestamp, not payload. Is it expected behaviour ? Are we getting windowing wrong? Any settings or other tricks to accommodate our use-case? For reference our setup: brokers, kafka-stream and kafka-clients all of v1.0.0 And here is code: KTable<Windowed<Tuple>, Long> summaries = in .groupBy((key, value) -> ......) .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) .count(); Thank you.