[
https://issues.apache.org/jira/browse/KAFKA-13678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499753#comment-17499753
]
Matthias J. Sax commented on KAFKA-13678:
-----------------------------------------
We have a different use case in mind: punctuation are designed to tricker
independent of the data (ie, content of the records), but only base on time
progress: Thus, the problem why try to solve is, that we don't know on startup
what record will be processed. Thus, in your first run, the first record might
have timestamp 12, and thus all punctuation would be 12, 24, 34, 44, 54... –
but in a re-run (due to data expiration on the input topic), your first record
might have timestamp 26, and thus punctuations would be different now: 26, 36,
46,... We want to ensure that we always have the same punctuation times
independent of the first input record we process, ie, 10, 20, 30, 40 in this
example. Does this make sense? The use case we have in mind a somewhat
different to the one you describe: we just do a callback every x seconds
independent of the content of the records.
Your use case make totally sense, but it's different as you want to schedule
something data driven, while current punctuations are designed to be data
independent. I guess the right fix would be, to extend the current feature and
to add a "time shift" parameter to the `schedule()` call – using the input
record timestamp you can compute the "time shift" to align the punctuation to
the current's record timestamp. Or we can even just accept a `-1` as time-shift
parameter to just say "start now". – In the end, it seems you don't really want
to schedule a _punctuation_ but a {_}timer{_}? So we might even consider to
design the feature differently: for a punctuation, it would run forever until
canceled, what seems not to align to your requirement, as it seems you only
want to trigger it max once?
Maybe you want to contribute such a feature? You would need to do a KIP for it,
as it would be a public API change:
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]
– We would discuss the details how the feature is designed on the KIP
> 2nd punctuation using STREAM_TIME does not respect scheduled interval
> ---------------------------------------------------------------------
>
> Key: KAFKA-13678
> URL: https://issues.apache.org/jira/browse/KAFKA-13678
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 3.0.0
> Reporter: Lorenzo Cagnatel
> Priority: Major
>
> Scheduling a punctuator using stream time, the first punctuation occurs
> immediately as documented, but the second one is not triggered at *t_schedule
> + interval* but it could happen before that time.
> For example, assume that we schedule a punctuation every 10 sec at timestamp
> 5 (t5). The system now works like this:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t10
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> punctuate, next schedule at t20
> ...{noformat}
> In this example the 2nd schedule occurs after 5 seconds from the first one,
> breaking the interval duration.
> From my point of view, a reasonable behaviour could be:
> {noformat}
> t5 -> schedule, punctuate, next schedule at t15
> t6 -> no punctuation
> t7 -> no punctuation
> t8 -> no punctuation
> t9 -> no punctuation
> t10 -> no punctuation
> t11 -> no punctuation
> t12 -> no punctuation
> t13 -> no punctuation
> t14 -> no punctuation
> t15 -> punctuate, next schedule at t25
> ...{noformat}
> The origin of this problem can be found in {*}StreamTask.schedule{*}:
> {code:java}
> /**
> * Schedules a punctuation for the processor
> *
> * @param interval the interval in milliseconds
> * @param type the punctuation type
> * @throws IllegalStateException if the current node is not null
> */
> public Cancellable schedule(final long interval, final PunctuationType type,
> final Punctuator punctuator) {
> switch (type) {
> case STREAM_TIME:
> // align punctuation to 0L, punctuate as soon as we have data
> return schedule(0L, interval, type, punctuator);
> case WALL_CLOCK_TIME:
> // align punctuation to now, punctuate after interval has elapsed
> return schedule(time.milliseconds() + interval, interval, type,
> punctuator);
> default:
> throw new IllegalArgumentException("Unrecognized PunctuationType: "
> + type);
> }
> }{code}
> when, in case of stream time, it calls *schedule* with {*}startTime=0{*}.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)