[ 
https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499753#comment-17499753
 ] 

Matthias J. Sax commented on KAFKA-13678:
-----------------------------------------

We have a different use case in mind: punctuation are designed to tricker 
independent of the data (ie, content of the records), but only base on time 
progress: Thus, the problem why try to solve is, that we don't know on startup 
what record will be processed. Thus, in your first run, the first record might 
have timestamp 12, and thus all punctuation would be 12, 24, 34, 44, 54... – 
but in a re-run (due to data expiration on the input topic), your first record 
might have timestamp 26, and thus punctuations would be different now: 26, 36, 
46,... We want to ensure that we always have the same punctuation times 
independent of the first input record we process, ie, 10, 20, 30, 40 in this 
example. Does this make sense? The use case we have in mind a somewhat 
different to the one you describe: we just do a callback every x seconds 
independent of the content of the records.

Your use case make totally sense, but it's different as you want to schedule 
something data driven, while current punctuations are designed to be data 
independent. I guess the right fix would be, to extend the current feature and 
to add a "time shift" parameter to the `schedule()` call – using the input 
record timestamp you can compute the "time shift" to align the punctuation to 
the current's record timestamp. Or we can even just accept a `-1` as time-shift 
parameter to just say "start now". – In the end, it seems you don't really want 
to schedule a _punctuation_ but a {_}timer{_}? So we might even consider to 
design the feature differently: for a punctuation, it would run forever until 
canceled, what seems not to align to your requirement, as it seems you only 
want to trigger it max once?

Maybe you want to contribute such a feature? You would need to do a KIP for it, 
as it would be a public API change: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 
– We would discuss the details how the feature is designed on the KIP

> 2nd punctuation using STREAM_TIME does not respect scheduled interval
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-13678
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13678
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.0.0
>            Reporter: Lorenzo Cagnatel
>            Priority: Major
>
> Scheduling a punctuator using stream time, the first punctuation occurs 
> immediately as documented, but the second one is not triggered at *t_schedule 
> + interval* but it could happen before that time. 
> For example, assume that we schedule a punctuation every 10 sec at timestamp 
> 5 (t5). The system now works like this:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t10
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> punctuate, next schedule at t20
> ...{noformat}
> In this example the 2nd schedule occurs after 5 seconds from the first one, 
> breaking the interval duration.
> From my point of view, a reasonable behaviour could be:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t15
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> no punctuation
> t11 -> no punctuation
> t12 -> no punctuation
> t13 -> no punctuation
> t14 -> no punctuation
> t15 -> punctuate, next schedule at t25
> ...{noformat}
> The origin of this problem can be found in {*}StreamTask.schedule{*}:
> {code:java}
> /**
> * Schedules a punctuation for the processor
> *
> * @param interval the interval in milliseconds
> * @param type the punctuation type
> * @throws IllegalStateException if the current node is not null
> */
> public Cancellable schedule(final long interval, final PunctuationType type, 
> final Punctuator punctuator) {
>    switch (type) {
>       case STREAM_TIME:
>          // align punctuation to 0L, punctuate as soon as we have data
>          return schedule(0L, interval, type, punctuator);
>       case WALL_CLOCK_TIME:
>          // align punctuation to now, punctuate after interval has elapsed
>          return schedule(time.milliseconds() + interval, interval, type, 
> punctuator);
>       default:
>          throw new IllegalArgumentException("Unrecognized PunctuationType: " 
> + type);
>    }
> }{code}
> when, in case of stream time, it calls *schedule* with {*}startTime=0{*}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to