[ https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-13678: ------------------------------------ Issue Type: Improvement (was: Bug) > 2nd punctuation using STREAM_TIME does not respect scheduled interval > --------------------------------------------------------------------- > > Key: KAFKA-13678 > URL: https://issues.apache.org/jira/browse/KAFKA-13678 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 3.0.0 > Reporter: Lorenzo Cagnatel > Priority: Major > > Scheduling a punctuator using stream time, the first punctuation occurs > immediately as documented, but the second one is not triggered at *t_schedule > + interval* but it could happen before that time. > For example, assume that we schedule a punctuation every 10 sec at timestamp > 5 (t5). The system now works like this: > {noformat} > t5 -> schedule, punctuate, next schedule at t10 > t6 -> no punctuation > t7 -> no punctuation > t8 -> no punctuation > t9 -> no punctuation > t10 -> punctuate, next schedule at t20 > ...{noformat} > In this example the 2nd schedule occurs after 5 seconds from the first one, > breaking the interval duration. > From my point of view, a reasonable behaviour could be: > {noformat} > t5 -> schedule, punctuate, next schedule at t15 > t6 -> no punctuation > t7 -> no punctuation > t8 -> no punctuation > t9 -> no punctuation > t10 -> no punctuation > t11 -> no punctuation > t12 -> no punctuation > t13 -> no punctuation > t14 -> no punctuation > t15 -> punctuate, next schedule at t25 > ...{noformat} > The origin of this problem can be found in {*}StreamTask.schedule{*}: > {code:java} > /** > * Schedules a punctuation for the processor > * > * @param interval the interval in milliseconds > * @param type the punctuation type > * @throws IllegalStateException if the current node is not null > */ > public Cancellable schedule(final long interval, final PunctuationType type, > final Punctuator punctuator) { > switch (type) { > case STREAM_TIME: > // align punctuation to 0L, punctuate as soon as we have data > return schedule(0L, interval, type, punctuator); > case WALL_CLOCK_TIME: > // align punctuation to now, punctuate after interval has elapsed > return schedule(time.milliseconds() + interval, interval, type, > punctuator); > default: > throw new IllegalArgumentException("Unrecognized PunctuationType: " > + type); > } > }{code} > when, in case of stream time, it calls *schedule* with {*}startTime=0{*}. -- This message was sent by Atlassian Jira (v8.20.1#820001)