Let me clarify the question a little bit. Consider the following code: stream .mapValues(...) .groupBy(...) .windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(10))) .aggregate(...)
I assume mapValues() operation could be slow for some tasks for whatever reason, and because of that tasks do process messages at a different pace. When a shuffle happens at the aggregate() operator, task 0 could have processed messages up to time T while task 1 is still at (T-skew), but messages from both tasks end up interleaved in a single partition of the internal topic (corresponding to the grouping key). My concern is that when skew is large enough (more than 10 seconds in my example), messages from the lagging task 1 will be dropped. Regards, Boris On 2020/11/16 10:33:10, ���������� �������������� <b.sukhi...@gmail.com> wrote: > Hi All, > > Let's consider a topic with multiple partitions and messages written in > event-time order without any particular partitioning scheme. Kafka Streams > application does some transformations on these messages, then groups by some > key, and then aggregates messages by an event-time window with the given > grace period. > > Each task could process incoming messages at a different speed (e.g., because > running on servers with different performance characteristics). This means > that after groupBy shuffle, event-time ordering will not be preserved between > messages in the same partition of the internal topic when they originate from > different tasks. After a while, this event-time skew could become larger than > the aggregation window size + grace period, which would lead to dropping > messages originating from the lagging task. > > Should it be a real concern, especially when processing large amounts of > historical data? Increasing the grace period doesn't seem like a valid option > because it would delay emitting the final aggregation result. Apache Flink > handles this by emitting the lowest watermark on partitions merge. Does Kafka > Streams offer something to deal with this scenario? > > Regards, > Boris >