[
https://issues.apache.org/jira/browse/KAFKA-9467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021696#comment-17021696
]
Sophie Blee-Goldman commented on KAFKA-9467:
--------------------------------------------
Obviously option 3 is a lot more work and may not be worth it, but the option
is always there. That means we can choose to go with 1 or 2 now, and if it
seems like we made the wrong choice based on actual user needs then we can
always add a new type and implement the other option
> Multiple wallclock punctuators may be scheduled after a rebalance
> -----------------------------------------------------------------
>
> Key: KAFKA-9467
> URL: https://issues.apache.org/jira/browse/KAFKA-9467
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Sophie Blee-Goldman
> Priority: Major
>
> In the eager rebalancing protocol*, Streams will suspend all tasks at the
> beginning of a rebalance and then resume those which have been reassigned to
> the same StreamThread. Part of suspending and resuming a task involves
> closing and reinitializing the topology, specifically calling Processor#close
> followed by Processor#init. If a wallclock punctuator is scheduled as part of
> init, it will be rescheduled again after every rebalance. Streams does not
> cancel existing punctuators during suspension, and does not tell users they
> must cancel punctuations themselves during Processor#close.
> This can cause multiple punctuators to build up over time, which has the
> apparent effect of increasing the net punctuation rate for wallclock
> punctuators. (The same technically occurs with event-time punctuators, but
> the punctuation times are anchored relative to a fixed point and only one
> will be triggered at a time, so there is no increased punctuation rate).
> There are several options at this point:
> A) Clear/cancel any existing punctuators during task suspension
> B) Push it to the user to cancel their punctuators in Processor#close, and
> update the documentation and examples to clarify this.
> C) Leave existing punctuators alone during suspension, and instead block new
> ones from being scheduled on top during re-initialization.
> One drawback of options A and B is that cancelling/rescheduling punctuators
> can mean a punctuation is never triggered if rebalances are more frequent
> than the punctuation interval. Even if they are still triggered, the
> effective punctuation interval will actually decrease as each rebalance
> delays the punctuation.
> Of course, if the task _does_ get migrated to another thread/instance the
> punctuation would be reset anyways with option C, since we do not currently
> store/persist the punctuation information anywhere. The wallclock semantics
> are somewhat loosely defined, but I think most users would not consider any
> of these a proper fix on their own as it just pushes the issue in the other
> direction.
> Of course, if we were to anchor the wallclock punctuations to a fixed time
> then this would not be a problem. At that point it seems reasonable to just
> leave it up to the user to cancel the punctuation during Processor#close,
> similar to any other kind of resource that must be cleaned up. Even if users
> forgot to do so it wouldn't affect the actual behavior, just causes unused
> punctuators to build up. See https://issues.apache.org/jira/browse/KAFKA-7699.
> Given this, I think the options for a complete solution are:
> 1) Implement KAFKA-7699 and then do A or B
> 2) Persist the current punctuation schedule while migrating a task
> (presumably in the Subscription userdata) and then do C
> Choosing the best course of action here is probably blocked on a decision on
> whether or not we want to anchor wallclock punctuations (KAFKA-7699). If we
> can't get consensus on that, we could always
> 3) Introduce a third type of punctuation, then do both 1 and 2 (for the new
> "anchored-wall-clock" type and the existing "wall-clock" type, respectively).
>
> -*Another naive workaround for this issue is to turn on/upgrade to
> cooperative rebalancing, which will not suspend and resume all active tasks
> during a rebalance, and only suspend tasks that will be immediately closed
> and migrated to another instance or StreamThread. Of course, this will still
> cause the punctuation to be reset for tasks that _are_ actually
> closed/migrated, so practically speaking it's identical to option C alone
--
This message was sent by Atlassian Jira
(v8.3.4#803005)