[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848791#comment-16848791
 ] 

Bruno Cadonna commented on KAFKA-7994:
--------------------------------------

I run into a case where the reset of the stream time to -1 after a restart 
resulted in incorrect results. More specifically, an event that should have 
been dropped because the grace period was exceeded opened a window.

Suppose the following application:
{code:java}
builder
   .stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
   .groupByKey()
   
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ofMinutes(30)))
   ...
{code}

with the following input (numbers are timestamps)
m1, m2, m3, m61, m4, m90, m5 
application restart
m6, m150

With the reset of the stream time to -1 after restart, the application will 
create the following windows:
[0, 60), {m1, m2, m3, m4}
[0, 60), {m6}
[60, 120), {m61, m90}

Window [0, 60), {m6} is wrong because if there had not be a restart, stream 
time would be at 90 when m6 reaches the processor. Stream time 90 is outside 
the grace period and m6 should be dropped. However, with the restart stream 
time is reset to -1 and m6 is not dropped because the condition to drop events 
timestamp < stream-time - grace-period is not satisfied, i.e., 6 < -1 - 30. 
Hence, a second window [0, 60) is incorrectly opened.

> Improve Stream-Time for rebalances and restarts
> -----------------------------------------------
>
>                 Key: KAFKA-7994
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7994
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Richard Yu
>            Priority: Major
>         Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to