[ https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854839#comment-17854839 ]
Ayoub Omari commented on KAFKA-16925: ------------------------------------- Thanks for these detailed clarifications ! I see now your point. It would be like maintaining an independent time tracker for each processor... Anyway I won't consider the fix that uses _context.currentStreamTimeMs_ for this. > stream-table join does not immediately forward expired records on restart > ------------------------------------------------------------------------- > > Key: KAFKA-16925 > URL: https://issues.apache.org/jira/browse/KAFKA-16925 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Ayoub Omari > Assignee: Ayoub Omari > Priority: Major > > [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join] > introduced grace period for KStreamKTableJoin. This allows to join a stream > to a KTable backed by a Versioned state store. > Upon receiving a record, it is put in a buffer until grace period is elapsed. > When the grace period elapses, the record is joined with its most recent > match from the versioned state store. > +Late records+ are +not+ put in the buffer and are immediately joined. > > {code:java} > If the grace period is non zero, the record will enter a stream buffer and > will dequeue when the record timestamp is less than or equal to stream time > minus the grace period. Late records, out of the grace period, will be > executed right as they come in. (KIP-923){code} > > However, this is not the case today on rebalance or restart. The reason is > that observedStreamTime is taken from the underlying state store which looses > this information on rebalance/restart: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164] > > If the task restarts and receives an expired record, the buffer considers > that this record has the maximum stream time observed so far, and puts it in > the buffer instead of immediately joining it. > > {*}Example{*}: > * Grace period = 60s > * KTable contains (key, rightValue) > > +Normal scenario+ > {code:java} > streamInput1 (key, value1) <--- time = T : put in buffer > streamInput2 (key, value2) <--- time = T - 60s : immediately joined // > streamTime = T{code} > > +Scenario with rebalance+ > {code:java} > streamInput1 (key, value1) <--- time = T : put in buffer > // --- rebalance --- > streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime > = T - 60s{code} > > The processor should use currentStreamTime from Context instead. Which is > recovered on restart. > -- This message was sent by Atlassian Jira (v8.20.10#820010)