[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2020-01-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007697#comment-17007697
 ] 

Matthias J. Sax commented on KAFKA-7994:


Feel free to update the ticket – but we should create new tickets instead – 
this feature is shipped in 2.4 and for tracking purpose we should not reopen.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2020-01-02 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007124#comment-17007124
 ] 

Sophie Blee-Goldman commented on KAFKA-7994:


The gist of this ticket is "Partition-time, the max timestamp by partition, is 
not saved after a rebalance/restart. This partition-time is used to determine 
when to drop late records. Therefore, a rebalance/restart can cause 
non-deterministic results."

Only the first sentence of that was true and has been fixed. I'm fine with 
opening a new ticket to address the actual problem: that records are dropped 
based on the local processor-time, which is not saved after a 
rebalance/restart. But if we leave this closed we should at least update the 
ticket's description to reflect what has/hasn't been fixed, so it won't mislead 
users.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-12-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006224#comment-17006224
 ] 

Matthias J. Sax commented on KAFKA-7994:


This particular issue about stream time is resolved. We should create new 
tickets if there are other issues.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Fix For: 2.4.0
>
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-09-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16933680#comment-16933680
 ] 

ASF GitHub Bot commented on KAFKA-7994:
---

mjsax commented on pull request #6694: [KAFKA-7994] Improve Stream time 
accuracy for restarts and rebalances 
URL: https://github.com/apache/kafka/pull/6694
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-09-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923359#comment-16923359
 ] 

Bruno Cadonna commented on KAFKA-7994:
--

I applied PR #6694 on the example I presented in my comment above. 
Unfortunately, it does not solve the issue, because the stream time handling is 
a bit intricate. Additionally to partition time, also a local processor time 
that depends on the partition time needs to be kept and made fail-safe to solve 
the issue. Although [~Yohan123]'s PR does not fix the issue, it is the 
important first step to get it fixed.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-31 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853388#comment-16853388
 ] 

Matthias J. Sax commented on KAFKA-7994:


Well. `suppress()` in not "affected" because `suppress()` itself does not have 
a bug – but you are correct, that if the bug hit, `suppress()` will emit 
duplicates (with this regard, is is affected – however, indirectly, because the 
lost partition time information itself does not break `suppress()` because 
suppress never uses this information). The upstream `aggregation()` uses the 
partition time information, thus, only `aggregate()` is affected directly 
(while `suppress()` issue is an indirect one). Does this make sense?

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-31 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16853118#comment-16853118
 ] 

Mateusz Owczarek commented on KAFKA-7994:
-

Bruno's example above indicates it actually does affect `suppress` operator in 
terms of producing only 'final window result'. For me the keyword 'final' 
suggests that one and only one result will be produced, whereas one simple 
rebalancing may cause multiple results of the same window.

Only recently I've been using `suppress` operator in my application and found 
out this behavior, so I've created a pull request for my Kafka Streams example 
repository describing the steps to simply reproduce multiple `suppress` results:
https://github.com/mowczare/kafka-streams-scala/pull/1

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-27 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16849022#comment-16849022
 ] 

Richard Yu commented on KAFKA-7994:
---

Hi Bruno. I think this one of the cases which this issue is working to fix. I 
think if my PR is merged, then this example you mentioned should no longer be a 
problem (time is reset correctly after restarts). 

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-27 Thread Bruno Cadonna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848791#comment-16848791
 ] 

Bruno Cadonna commented on KAFKA-7994:
--

I run into a case where the reset of the stream time to -1 after a restart 
resulted in incorrect results. More specifically, an event that should have 
been dropped because the grace period was exceeded opened a window.

Suppose the following application:
{code:java}
builder
   .stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()))
   .groupByKey()
   
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ofMinutes(30)))
   ...
{code}

with the following input (numbers are timestamps)
m1, m2, m3, m61, m4, m90, m5 
application restart
m6, m150

With the reset of the stream time to -1 after restart, the application will 
create the following windows:
[0, 60), {m1, m2, m3, m4}
[0, 60), {m6}
[60, 120), {m61, m90}

Window [0, 60), {m6} is wrong because if there had not be a restart, stream 
time would be at 90 when m6 reaches the processor. Stream time 90 is outside 
the grace period and m6 should be dropped. However, with the restart stream 
time is reset to -1 and m6 is not dropped because the condition to drop events 
timestamp < stream-time - grace-period is not satisfied, i.e., 6 < -1 - 30. 
Hence, a second window [0, 60) is incorrectly opened.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-26 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848610#comment-16848610
 ] 

Richard Yu commented on KAFKA-7994:
---

cc [~mjsax] Made the change.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848607#comment-16848607
 ] 

Matthias J. Sax commented on KAFKA-7994:


Yes. Please rebase the PR to only include the "preserve partition time" fix. 
This helps to make progress. Thanks a lot.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-26 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16848534#comment-16848534
 ] 

Richard Yu commented on KAFKA-7994:
---

Sure [~mjsax] I'm all for it, since it looks like the global stream time will 
be hard to define. I will create a new issue.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846857#comment-16846857
 ] 

Matthias J. Sax commented on KAFKA-7994:


I think it might actually be worth to split the two issues into two tickets. 
One ticket to just preserve partition time over restarts (the original issue of 
this ticket), and do a new ticket for global stream time. There are still many 
open question what global stream time actually means and it will be a difficult 
and long design phase until we can merge any code. Hence, I would like to 
unblock the original issue of this ticket.

Thoughts?

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-21 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845381#comment-16845381
 ] 

Sophie Blee-Goldman commented on KAFKA-7994:


Hi [~Yohan123], I think we need to be careful in assuming a singular view of 
streamtime across tasks or even within a single task. Rather than it being an 
obstacle that different subtopologies can't "talk" to one another and pass 
along a single stream time, I think this actually enforces correctness – each 
node has its own sense of time and it doesn't make sense for them to look 
upstream for the time as seen by a different node. See 
[https://github.com/apache/kafka/pull/6278#|https://github.com/apache/kafka/pull/6278]

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-21 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16845338#comment-16845338
 ] 

Richard Yu commented on KAFKA-7994:
---

[~guozhang] [~mjsax] and [~vvcephei] I created a KIP for this issue so we could 
discuss this more in detail. Below is the link.

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-472%3A+%5BSTREAMS%5D+Add+partition+time+field+to+RecordContext]

Hope this could facilitate the process! :)

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843739#comment-16843739
 ] 

Matthias J. Sax commented on KAFKA-7994:


I am personally not convinced, that using headers is the best approach. For 
internal repartition topics it might be ok, but what about user topics (eg, 
`through()`)? For PAPI users, all topic are considered user topics and we might 
"pollute" the stream by adding headers.

Would be good to hear what other think about this issue?

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-19 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843540#comment-16843540
 ] 

Richard Yu commented on KAFKA-7994:
---

Ok, after digging around, I figured this much out:
 # Headers are also stored in ProducerRecords when they are sent over. The 
headers that are sent in the SinkNode are RecordContext's headers.
 # However, that does not mean the headers before they are serialized and put 
into a ProducerRecord, that they cannot be modified. What we can do is add an 
extra header to the headers we already have (i.e. add a {{new 
RecordHeader("partition time", Long.toByteArray(long timestamp))}}).
 # When StreamThread polls for ConsumerRecords to process, from the 
ConsumerRecords received, we would simply request for the header with the 
{{"partition time"}} string key. The value stored along side the key is our 
needed partition timestamp.

In effect, this does not require to modify RecordContext, and also allows a 
quick, cheap way of delivering partition times across tasks.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-19 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843534#comment-16843534
 ] 

Richard Yu commented on KAFKA-7994:
---

Oh, just realized what was wrong with my approach. You are right: 
RecordContext's headers is probably the only way to get the partition time sent 
over. Starting a wiki page right now seems a good idea to discuss this more.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843517#comment-16843517
 ] 

Matthias J. Sax commented on KAFKA-7994:


> So this might handle how partition time is transmitted.

Not sure what you mean by this? Tasks can exchange data only via repartition 
topics. To me, it's still unclear how an upstream task forwards it's partition 
time to a downstream task?

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-19 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843463#comment-16843463
 ] 

Richard Yu commented on KAFKA-7994:
---

Yeah, those are some nice points. ProcessorContext is indeed restricted to a 
single task, so I did some thinking and I realized that RecordContext does not 
necessarily have to be the delivery package for partition time. The 
ProcessorNode (which is by the way not visible to Kafka users) itself can be 
used to carry relevant partition time data, so no KIP should be needed if we 
only modify this part of KafkaStreams.

If you were to look in ProcessorContextImpl, you would realize that both the 
parent and child node are available for reference when forwarding a message. I 
realized that during this forwarding process, we could also transmit a parent 
node's partition time to the child node's. (so in other words, ProcessorNode 
stores the relevant time metadata). ProcessorNode could easily be queried by 
the StreamTask. Using this approach, no pubic API technically will be touched. 
So this might handle how partition time is transmitted.

As for how a downstream task handle the information, one possibility might be 
to take the biggest partition times out of all the ones forwarded and use that 
one, but that could prove to be too greedy. I'm not sure about that part at the 
moment.

 

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843402#comment-16843402
 ] 

Matthias J. Sax commented on KAFKA-7994:


Adding partition time to the context might be a good idea. I was thinking about 
this in the past already. We might want to extend public interface 
`RecordContext` to add partition and/or stream time. (This would require a KIP 
though.) It would be good information for users at PAPI level.
{quote}So when a record is forwarded and is received, the next node/StreamTask 
will also have their ProcessorContext updated with its partition time as well 
by the upstream processor.
{quote}
How would that work? Atm, the context's life-cycle is limited to a single task 
and not forwarded through topics.

What is also unclear: how would a downstream task process this information? A 
single task may have multiple upstream tasks, and those upstream tasks, could 
report different partition times (also not necessarily monotonically 
increasing). It's unclear to me, how this should be handled?

Thinking about all this, I am wondering if it might make sense to start a wiki 
page (new sub page in 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams]) to discuss 
this issue? It would be a great way to document the semantics and internals for 
users? \cc [~guozhang] [~vvcephei]

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.
> Notice that this particular issue applies for all Stream Tasks in the 
> topology. The further down the DAG records flow, the more likely it is that 
> the StreamTask will have an incorrect stream time. For instance, if r3 was 
> filtered out, the tasks receiving the processed records will compute the 
> stream time as 5 instead of the correct timestamp being 11. This entails us 
> to also propagate the latest observed partition time as well downstream.  
> That means the sources located at the head of the topology must forward the 
> partition time to its subtopologies whenever records are sent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-18 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843223#comment-16843223
 ] 

Richard Yu commented on KAFKA-7994:
---

Oh, just realized that there should be more changes. Sorry, didn't realize the 
scope of the big picture. 

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-14 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16839868#comment-16839868
 ] 

Richard Yu commented on KAFKA-7994:
---

Hi all, could we get some reviews on the PR? There hasn't been too much 
activity as of late, so I thought it would be good if we could get some 
thoughts.  

 

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835192#comment-16835192
 ] 

ASF GitHub Bot commented on KAFKA-7994:
---

ConcurrencyPractitioner commented on pull request #6694: [KAFKA-7994] Improve 
Stream time accuracy for restarts and rebalances 
URL: https://github.com/apache/kafka/pull/6694
 
 
   The issue for this PR could be found here:
   
https://issues.apache.org/jira/browse/KAFKA-7994?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)
   
   As noted in the JIRA description, stream time is incorrectly set to -1 after 
rebalances and restarts. To help resolve this issue, a straightforward approach 
is to commit the individual partition time with last message processed for each 
RecordQueue. Hence, after a restart, we could set the partition time to the 
last committed partition time. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-05-06 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833952#comment-16833952
 ] 

Richard Yu commented on KAFKA-7994:
---

Alright, so I posted a small patch which might help resolve this issue. Will 
open a PR soon as well.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-04-20 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822589#comment-16822589
 ] 

Richard Yu commented on KAFKA-7994:
---

Oh, I'm fine doing some digging though. I might try to take a hack at it, but 
thanks for the heads up!

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-04-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822535#comment-16822535
 ] 

Matthias J. Sax commented on KAFKA-7994:


It's a little unclear how to tackle the issue atm, and it might turn out to be 
rather complex... 

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-04-19 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822327#comment-16822327
 ] 

Richard Yu commented on KAFKA-7994:
---

[~mjsax] Do you think it would be fine if I took this issue?

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-03-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16785150#comment-16785150
 ] 

Guozhang Wang commented on KAFKA-7994:
--

| It wouldn't be terrible if we sometimes held on to records a little longer 
than necessary, but actually, I think this won't happen either.

Hmm I think there's still a risk for this, depending on how users think of 
suppression semantics: back to my example in the first comment, and consider a 
windowed count with length 10 and grace 0, users would expect that a final 
result of window (0,10) be sent when r3 is processed with value 2, but they 
would not see the result even after r4's processed since the ST is only 2 at 
that sub-topology, right?

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-03-05 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16785144#comment-16785144
 ] 

Guozhang Wang commented on KAFKA-7994:
--

Just to add: even in the KIP to add negative timestamp support, we will still 
preserve -1 for special meanings and trade that we cannot represent that 
specific ms in history :)

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-03-01 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782206#comment-16782206
 ] 

Matthias J. Sax commented on KAFKA-7994:


Thanks [~vvcephei]. Sounds convincing! Glad that suppress() is not subject to 
this issue.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-03-01 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782178#comment-16782178
 ] 

John Roesler commented on KAFKA-7994:
-

I did some mental math when I made that last change to stream time. It's funny, 
I keep proving the math to myself, but I can't shake the feeling that there 
might be something wrong with it... Thanks for bringing this up, the more we go 
over it, the better.

 

It's always possible I've made an arithmetic error, but I don't think we'll 
ever emit earlier than necessary, and we shouldn't emit later than necessary 
either.

The stream-time variable is initialized to -1, which is smaller than any record 
timestamp (because we're not considering negative timestamps yet), so it's also 
smaller than any "real" stream time.

Records are only emitted from the suppression when a certain time passes 
*after* the record time. I.e., (ST is stream time, BR is a buffered record, D 
is suppression duration) when ST >= BR.time + D . Neither BR.time nor D can be 
negative, so that expression is never true for ST = -1 (the initial, "unknown 
time", value). Therefore, resetting stream time to -1 at any point is safe, in 
that it can never cause premature evictions.

 

It wouldn't be terrible if we sometimes held on to records a little longer than 
necessary, but actually, I think this won't happen either.

Stream time advances when we process new records through the Suppress 
processor, and the processing of a new record isn't complete until we emit all 
of the buffered records that should be emitted as a result of the stream time 
advancement that record causes. Therefore, we have an invariant that, if the 
processing of record R is complete, ST >= R.time and the buffer contains only 
buffered records BR such that BR.time > ST - D .

Now, if we restart after processing R, we'll forget all about that prior stream 
time, and just take the next record's (NR) timestamp as the new stream time. 
NR.time can either be larger or smaller than the previous ST. If it's larger, 
it's equivalent to a normal increment of stream time, and all the normal logic 
applies. The unusual situation is that if NR.time is smaller than the previous 
ST. This means the new stream time ST' is smaller than the prior stream time. 
If we hadn't had the "reset", what would have happened? Since ST' = max(ST, 
NR.time) and NR.time < ST, we get ST' = ST. The stream time doesn't advance at 
all, and we should not emit anything.

So whether we reset the stream time to -1 or preserve the prior ST, the exact 
same records will be emitted or not.

 

I wrote the above out long form to try and eliminate any hand-waving from my 
thinking about this. Is there any problem with the reasoning? If not, then we 
have a proof that resetting the stream time to -1 has no impact on suppress. 

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time 

[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-02-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16778810#comment-16778810
 ] 

Matthias J. Sax commented on KAFKA-7994:


{quote}So I'm feeling that if we can tackle those issues within a single 
solution, then maybe we can consider propagating the stream time inferred at 
the "head" sub-topologies, plus storing it at the offset metadata.
{quote}
This might be helpful. I actually believe, that `suppress()` might face a 
similar issue atm. \cc [~vvcephei] What is the impact on `suppress()` on 
restarts? Could it be that we emit incorrectly for this case?

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-02-26 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16778460#comment-16778460
 ] 

Guozhang Wang commented on KAFKA-7994:
--

Thanks for brining this up! I've also thought about this lately, and another 
semi-related issue is that the inferred partition/stream time is inconsistency 
across sub-topologies today because each task independently infer that based on 
its own incoming records' timestamp. As of today this is sorta by-design since 
each task is executed independently, but still there are shortcomings like 
inter-state-store consistency and global-consistent savepoints. Take your 
example in the desciprtion, when we've processed up to r4 our current inferred 
stream time is 11 already, but imagine this sub-topology will eventually have a 
filter processor followed by a sink sending to an intermediate topic, and r2 
and r3 are filtered, then in the down stream sub-topology we will only see the 
the processed results of r1 and r4 (and their inherited record timestamp) and 
then infer the stream time as 2.

So I'm feeling that if we can tackle those issues within a single solution, 
then maybe we can consider propagating the stream time inferred at the "head" 
sub-topologies, plus storing it at the offset metadata.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
>  
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)