[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-05-18 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015410#comment-16015410
 ] 

Sinóros-Szabó Péter commented on KAFKA-3514:


Considering the above ideas, I would prefer to see stream time & punctuation to 
work in the following way:
1., stream time is advanced before the message is processed (not after as it 
happens now), so the message will be processed in the appropriate punctuation 
call
2., call `punctuate()` with not the current stream time, but with the timestamp 
calculated with `first_punctuation_timestamp + X * scheduled_time` at each 
iteration, so that each punctuation will see a time increase difference that 
equals to the scheduled time interval, even those that is called just after 
each other because the lack of messages for awhile. (Now the real difference 
varies between (0 - 2x interval) in case of low message traffic).
3., consider running the missed punctuation when the application starts up, may 
be an additional option for `context.schedule()`

What is your opinion on this?

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-05-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010813#comment-16010813
 ] 

Guozhang Wang commented on KAFKA-3514:
--

Thanks [~mihbor]

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-05-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009365#comment-16009365
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

I've created KAFKA-5233 to track work related to KIP-138. As noted above, the 
considerations on this ticket span beyond the scope of KIP-138, which is 
agnostic to how the stream time gets advanced.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-05-12 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16008459#comment-16008459
 ] 

Matthias J. Sax commented on KAFKA-3514:


This JIRA also relates to this user question: 
http://search-hadoop.com/m/uyzND1iKZJN1yz0E5=Order+of+punctuate+and+process+in+a+stream+processor

We should consider scheduled punctuations, when advancing "stream time" -- not 
just advance "stream time" coarse grain by record timestamps only.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990288#comment-15990288
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Agreed. That's what I meant. Beyond the scope of KIP-138 and let's keep it that 
way.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-04-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990281#comment-15990281
 ] 

Matthias J. Sax commented on KAFKA-3514:


Your observation is correct -- and this ticket is exactly about the issue you 
describe. There are two problems with the current design (1) it's hard to 
reason about processing order and (2) there is some non-determinism. But I 
think it does not affect KIP-138 (I agree that it's somehow connected). KIP-138 
is about when/how to punctuate and the punctuation strategy should be agnostic 
to the way "stream time" gets advanced.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-04-30 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990176#comment-15990176
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

I think the description of this ticket is missing an important detail.
If my understanding is correct, it will behave as described if all the records 
arrive in a single batch.
However, if the records preceding the record with timestamp "1" come in a 
separate batch (I'll use brackets to depict batch boundaries):
{code}
Stream A: [5, 6, 7, 8, 9], [1, 10]

Stream B: [2, 3, 4, 5]
{code}
then initially the timestamp for stream A is going to be set to "5" (minimum of 
the first batch) and since it's not allowed to move back, the second batch 
containing the late arriving record "1" is not going to change that. Stream B 
is going to be drained first until "5".
However, if the batch boundaries are different by just one record and the late 
arriving "1" is in the first batch:
{code}
Stream A: [5, 6, 7, 8, 9, 1], [10]

Stream B: [2, 3, 4, 5]
{code}
 then it's going to behave as currently described.

Please correct me if I got this wrong.
But if that is the case, it feels all too non-deterministic and I think the 
timestamp computation deserves further thought beyond the scope of 
[KIP-138|https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics],
 which is limited to punctuate semantics, but not stream time semantics in 
general.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926839#comment-15926839
 ] 

Matthias J. Sax commented on KAFKA-3514:


Thanks for you input. It's an interesting approach. We plan to do a KIP for 
this change, as it will be tricky to get right. It would be great if you would 
participate in the discussion on the dev list -- not sure, when the KIP will be 
pushed at the moment. (see 
https://github.com/apache/kafka/pull/1689#issuecomment-286523692)

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-15 Thread Arun Mathew (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15926036#comment-15926036
 ] 

Arun Mathew commented on KAFKA-3514:


Hi [~mjsax] [~mihbor] . We were building an audit trail for kafka based on 
kafka stream and encountered similar issue.

Our work around was to use a hybrid of event time and system time. During 
regular operation we use event time. But when we create the punctuation 
schedules (object representing the next punctuation time) we also record the 
system time at which the punctuation schedule was created. The punctuate code 
was modified to punctuate anyway, in case the interval specified in the 
punctuation schedule has elapsed in terms of system time (current time - time 
of punctuation schedule creation), a new punctuation schedule corresponding to 
the next expected punctuation time (current punctuation time + punctuation 
interval) is also created. 

In an earlier version of kafka the above logic sufficed as the mayBePunctuate 
was called as part of the polling for events (in the absence of events). But 
current version doesn't seem to call it so we had to patch that portion a bit 
too.

Please let me know your thoughts.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897891#comment-15897891
 ] 

Guozhang Wang commented on KAFKA-3514:
--

There are some discussions on pros and cons of changing the punctuate 
semantics: 

https://github.com/apache/kafka/pull/1689

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897771#comment-15897771
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Oh, I wouldn't mind that at all. Just thought that you wanted to stick to event 
time semantics for this, but if you're not precious about that then I'm all for 
it :)

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897729#comment-15897729
 ] 

Matthias J. Sax commented on KAFKA-3514:


[~mihbor] I am not sure about this. To me it seems to be a cleaner solution, to 
change punctuate semantics back to system time.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897413#comment-15897413
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Thank you for responding.
Just now I had a thought about the semantics of event time.
It is already possible to provide a TimestampExtractor that determines what the 
event time is, given a message.
It's not far fetched to assume user should also want a way to specify what the 
event time is, given the absence of messages (on one or more input partitions).
Possibly by providing an implementation other than what 
PartitionGroup.timestamp() is doing based on the timestamps of its partitions.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897381#comment-15897381
 ] 

Eno Thereska commented on KAFKA-3514:
-

[~mihbor] I understand now. I had thought the effect was limited, but what you 
say makes sense. Thanks.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897371#comment-15897371
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

Hi [~enothereska],
I have to disagree. It is perfectly clear to me (from the documentation) that 
punctuate is based on event time, not system time. However, the problem is 
event time is not advanced reliably, since a single input stream that doesn't 
receive messages will cause the event time to not be advanced. In an extreme 
case of a poorly partitioned topic, I can imagine some partition may never get 
a message. That would cause a topology that has that partition as input to not 
advance event time ever, hence not fire punctuate ever, regardless of the 
presence of messages on its other input topics. In my opinion, if the purpose 
of punctuate is to perform periodic operations, then this flaw makes it unfit 
for that purpose. 

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-03-06 Thread Eno Thereska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897290#comment-15897290
 ] 

Eno Thereska commented on KAFKA-3514:
-

[~mihbor] 2) above is slightly different from the problem you are describing 
(it seems to me). I think the issue you are describing is that punctuate is 
based on event time, not system time.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2016-11-21 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15683056#comment-15683056
 ] 

Michal Borowiecki commented on KAFKA-3514:
--

IMO, 2) *is* a severe problem. Punctuate methods (as described by their API) 
are meant to perform periodic operations. As it currently stands, if any of the 
input topics doesn't receive messages regularly, the punctuate method won't be 
called regularly either (due to the min offset across all partitions not 
advancing), which violates what the API promises. 
We've worked around it in our app by creating an independent stream and a 
scheduler sending ticks regularly to an input topic to a Transformer, so that 
it's punctuate method is called predictably but this is far from ideal.

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2016-10-10 Thread David J. Garcia (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15562785#comment-15562785
 ] 

David J. Garcia commented on KAFKA-3514:


maybe just use Max(all_paritions_ts) instead of min?

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)