[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015410#comment-16015410 ] Sinóros-Szabó Péter commented on KAFKA-3514: Considering the above ideas, I would prefer to see stream time & punctuation to work in the following way: 1., stream time is advanced before the message is processed (not after as it happens now), so the message will be processed in the appropriate punctuation call 2., call `punctuate()` with not the current stream time, but with the timestamp calculated with `first_punctuation_timestamp + X * scheduled_time` at each iteration, so that each punctuation will see a time increase difference that equals to the scheduled time interval, even those that is called just after each other because the lack of messages for awhile. (Now the real difference varies between (0 - 2x interval) in case of low message traffic). 3., consider running the missed punctuation when the application starts up, may be an additional option for `context.schedule()` What is your opinion on this? > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) > {quote} > Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010813#comment-16010813 ] Guozhang Wang commented on KAFKA-3514: -- Thanks [~mihbor] > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) > {quote} > Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009365#comment-16009365 ] Michal Borowiecki commented on KAFKA-3514: -- I've created KAFKA-5233 to track work related to KIP-138. As noted above, the considerations on this ticket span beyond the scope of KIP-138, which is agnostic to how the stream time gets advanced. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) > {quote} > Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16008459#comment-16008459 ] Matthias J. Sax commented on KAFKA-3514: This JIRA also relates to this user question: http://search-hadoop.com/m/uyzND1iKZJN1yz0E5=Order+of+punctuate+and+process+in+a+stream+processor We should consider scheduled punctuations, when advancing "stream time" -- not just advance "stream time" coarse grain by record timestamps only. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990288#comment-15990288 ] Michal Borowiecki commented on KAFKA-3514: -- Agreed. That's what I meant. Beyond the scope of KIP-138 and let's keep it that way. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990281#comment-15990281 ] Matthias J. Sax commented on KAFKA-3514: Your observation is correct -- and this ticket is exactly about the issue you describe. There are two problems with the current design (1) it's hard to reason about processing order and (2) there is some non-determinism. But I think it does not affect KIP-138 (I agree that it's somehow connected). KIP-138 is about when/how to punctuate and the punctuation strategy should be agnostic to the way "stream time" gets advanced. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990176#comment-15990176 ] Michal Borowiecki commented on KAFKA-3514: -- I think the description of this ticket is missing an important detail. If my understanding is correct, it will behave as described if all the records arrive in a single batch. However, if the records preceding the record with timestamp "1" come in a separate batch (I'll use brackets to depict batch boundaries): {code} Stream A: [5, 6, 7, 8, 9], [1, 10] Stream B: [2, 3, 4, 5] {code} then initially the timestamp for stream A is going to be set to "5" (minimum of the first batch) and since it's not allowed to move back, the second batch containing the late arriving record "1" is not going to change that. Stream B is going to be drained first until "5". However, if the batch boundaries are different by just one record and the late arriving "1" is in the first batch: {code} Stream A: [5, 6, 7, 8, 9, 1], [10] Stream B: [2, 3, 4, 5] {code} then it's going to behave as currently described. Please correct me if I got this wrong. But if that is the case, it feels all too non-deterministic and I think the timestamp computation deserves further thought beyond the scope of [KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics], which is limited to punctuate semantics, but not stream time semantics in general. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926839#comment-15926839 ] Matthias J. Sax commented on KAFKA-3514: Thanks for you input. It's an interesting approach. We plan to do a KIP for this change, as it will be tricky to get right. It would be great if you would participate in the discussion on the dev list -- not sure, when the KIP will be pushed at the moment. (see https://github.com/apache/kafka/pull/1689#issuecomment-286523692) > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926036#comment-15926036 ] Arun Mathew commented on KAFKA-3514: Hi [~mjsax] [~mihbor] . We were building an audit trail for kafka based on kafka stream and encountered similar issue. Our work around was to use a hybrid of event time and system time. During regular operation we use event time. But when we create the punctuation schedules (object representing the next punctuation time) we also record the system time at which the punctuation schedule was created. The punctuate code was modified to punctuate anyway, in case the interval specified in the punctuation schedule has elapsed in terms of system time (current time - time of punctuation schedule creation), a new punctuation schedule corresponding to the next expected punctuation time (current punctuation time + punctuation interval) is also created. In an earlier version of kafka the above logic sufficed as the mayBePunctuate was called as part of the polling for events (in the absence of events). But current version doesn't seem to call it so we had to patch that portion a bit too. Please let me know your thoughts. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897891#comment-15897891 ] Guozhang Wang commented on KAFKA-3514: -- There are some discussions on pros and cons of changing the punctuate semantics: https://github.com/apache/kafka/pull/1689 > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897771#comment-15897771 ] Michal Borowiecki commented on KAFKA-3514: -- Oh, I wouldn't mind that at all. Just thought that you wanted to stick to event time semantics for this, but if you're not precious about that then I'm all for it :) > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897729#comment-15897729 ] Matthias J. Sax commented on KAFKA-3514: [~mihbor] I am not sure about this. To me it seems to be a cleaner solution, to change punctuate semantics back to system time. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897413#comment-15897413 ] Michal Borowiecki commented on KAFKA-3514: -- Thank you for responding. Just now I had a thought about the semantics of event time. It is already possible to provide a TimestampExtractor that determines what the event time is, given a message. It's not far fetched to assume user should also want a way to specify what the event time is, given the absence of messages (on one or more input partitions). Possibly by providing an implementation other than what PartitionGroup.timestamp() is doing based on the timestamps of its partitions. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897381#comment-15897381 ] Eno Thereska commented on KAFKA-3514: - [~mihbor] I understand now. I had thought the effect was limited, but what you say makes sense. Thanks. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897371#comment-15897371 ] Michal Borowiecki commented on KAFKA-3514: -- Hi [~enothereska], I have to disagree. It is perfectly clear to me (from the documentation) that punctuate is based on event time, not system time. However, the problem is event time is not advanced reliably, since a single input stream that doesn't receive messages will cause the event time to not be advanced. In an extreme case of a poorly partitioned topic, I can imagine some partition may never get a message. That would cause a topology that has that partition as input to not advance event time ever, hence not fire punctuate ever, regardless of the presence of messages on its other input topics. In my opinion, if the purpose of punctuate is to perform periodic operations, then this flaw makes it unfit for that purpose. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897290#comment-15897290 ] Eno Thereska commented on KAFKA-3514: - [~mihbor] 2) above is slightly different from the problem you are describing (it seems to me). I think the issue you are describing is that punctuate is based on event time, not system time. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683056#comment-15683056 ] Michal Borowiecki commented on KAFKA-3514: -- IMO, 2) *is* a severe problem. Punctuate methods (as described by their API) are meant to perform periodic operations. As it currently stands, if any of the input topics doesn't receive messages regularly, the punctuate method won't be called regularly either (due to the min offset across all partitions not advancing), which violates what the API promises. We've worked around it in our app by creating an independent stream and a scheduler sending ticks regularly to an input topic to a Transformer, so that it's punctuate method is called predictably but this is far from ideal. > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.2.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts
[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15562785#comment-15562785 ] David J. Garcia commented on KAFKA-3514: maybe just use Max(all_paritions_ts) instead of min? > Stream timestamp computation needs some further thoughts > > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: architecture > Fix For: 0.10.2.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. -- This message was sent by Atlassian JIRA (v6.3.4#6332)