[ https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499753#comment-17499753 ]
Matthias J. Sax commented on KAFKA-13678: ----------------------------------------- We have a different use case in mind: punctuation are designed to tricker independent of the data (ie, content of the records), but only base on time progress: Thus, the problem why try to solve is, that we don't know on startup what record will be processed. Thus, in your first run, the first record might have timestamp 12, and thus all punctuation would be 12, 24, 34, 44, 54... – but in a re-run (due to data expiration on the input topic), your first record might have timestamp 26, and thus punctuations would be different now: 26, 36, 46,... We want to ensure that we always have the same punctuation times independent of the first input record we process, ie, 10, 20, 30, 40 in this example. Does this make sense? The use case we have in mind a somewhat different to the one you describe: we just do a callback every x seconds independent of the content of the records. Your use case make totally sense, but it's different as you want to schedule something data driven, while current punctuations are designed to be data independent. I guess the right fix would be, to extend the current feature and to add a "time shift" parameter to the `schedule()` call – using the input record timestamp you can compute the "time shift" to align the punctuation to the current's record timestamp. Or we can even just accept a `-1` as time-shift parameter to just say "start now". – In the end, it seems you don't really want to schedule a _punctuation_ but a {_}timer{_}? So we might even consider to design the feature differently: for a punctuation, it would run forever until canceled, what seems not to align to your requirement, as it seems you only want to trigger it max once? Maybe you want to contribute such a feature? You would need to do a KIP for it, as it would be a public API change: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] – We would discuss the details how the feature is designed on the KIP > 2nd punctuation using STREAM_TIME does not respect scheduled interval > --------------------------------------------------------------------- > > Key: KAFKA-13678 > URL: https://issues.apache.org/jira/browse/KAFKA-13678 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 3.0.0 > Reporter: Lorenzo Cagnatel > Priority: Major > > Scheduling a punctuator using stream time, the first punctuation occurs > immediately as documented, but the second one is not triggered at *t_schedule > + interval* but it could happen before that time. > For example, assume that we schedule a punctuation every 10 sec at timestamp > 5 (t5). The system now works like this: > {noformat} > t5 -> schedule, punctuate, next schedule at t10 > t6 -> no punctuation > t7 -> no punctuation > t8 -> no punctuation > t9 -> no punctuation > t10 -> punctuate, next schedule at t20 > ...{noformat} > In this example the 2nd schedule occurs after 5 seconds from the first one, > breaking the interval duration. > From my point of view, a reasonable behaviour could be: > {noformat} > t5 -> schedule, punctuate, next schedule at t15 > t6 -> no punctuation > t7 -> no punctuation > t8 -> no punctuation > t9 -> no punctuation > t10 -> no punctuation > t11 -> no punctuation > t12 -> no punctuation > t13 -> no punctuation > t14 -> no punctuation > t15 -> punctuate, next schedule at t25 > ...{noformat} > The origin of this problem can be found in {*}StreamTask.schedule{*}: > {code:java} > /** > * Schedules a punctuation for the processor > * > * @param interval the interval in milliseconds > * @param type the punctuation type > * @throws IllegalStateException if the current node is not null > */ > public Cancellable schedule(final long interval, final PunctuationType type, > final Punctuator punctuator) { > switch (type) { > case STREAM_TIME: > // align punctuation to 0L, punctuate as soon as we have data > return schedule(0L, interval, type, punctuator); > case WALL_CLOCK_TIME: > // align punctuation to now, punctuate after interval has elapsed > return schedule(time.milliseconds() + interval, interval, type, > punctuator); > default: > throw new IllegalArgumentException("Unrecognized PunctuationType: " > + type); > } > }{code} > when, in case of stream time, it calls *schedule* with {*}startTime=0{*}. -- This message was sent by Atlassian Jira (v8.20.1#820001)