[ 
https://issues.apache.org/jira/browse/KAFKA-9467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021696#comment-17021696
 ] 

Sophie Blee-Goldman commented on KAFKA-9467:
--------------------------------------------

Obviously option 3 is a lot more work and may not be worth it, but the option 
is always there. That means we can choose to go with 1 or 2 now, and if it 
seems like we made the wrong choice based on actual user needs then we can 
always add a new type and implement the other option

> Multiple wallclock punctuators may be scheduled after a rebalance
> -----------------------------------------------------------------
>
>                 Key: KAFKA-9467
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9467
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Sophie Blee-Goldman
>            Priority: Major
>
> In the eager rebalancing protocol*, Streams will suspend all tasks at the 
> beginning of a rebalance and then resume those which have been reassigned to 
> the same StreamThread. Part of suspending and resuming a task involves 
> closing and reinitializing the topology, specifically calling Processor#close 
> followed by Processor#init. If a wallclock punctuator is scheduled as part of 
> init, it will be rescheduled again after every rebalance. Streams does not 
> cancel existing punctuators during suspension, and does not tell users they 
> must cancel punctuations themselves during Processor#close.
> This can cause multiple punctuators to build up over time, which has the 
> apparent effect of increasing the net punctuation rate for wallclock 
> punctuators. (The same technically occurs with event-time punctuators, but 
> the punctuation times are anchored relative to a fixed point and only one 
> will be triggered at a time, so there is no increased punctuation rate).
> There are several options at this point:
> A) Clear/cancel any existing punctuators during task suspension
> B) Push it to the user to cancel their punctuators in Processor#close, and 
> update the documentation and examples to clarify this.
> C) Leave existing punctuators alone during suspension, and instead block new 
> ones from being scheduled on top during re-initialization.
> One drawback of options A and B is that cancelling/rescheduling punctuators 
> can mean a punctuation is never triggered if rebalances are more frequent 
> than the punctuation interval. Even if they are still triggered, the 
> effective punctuation interval will actually decrease as each rebalance 
> delays the punctuation.
> Of course, if the task _does_ get migrated to another thread/instance the 
> punctuation would be reset anyways with option C, since we do not currently 
> store/persist the punctuation information anywhere. The wallclock semantics 
> are somewhat loosely defined, but I think most users would not consider any 
> of these a proper fix on their own as it just pushes the issue in the other 
> direction.
> Of course, if we were to anchor the wallclock punctuations to a fixed time 
> then this would not be a problem. At that point it seems reasonable to just 
> leave it up to the user to cancel the punctuation during Processor#close, 
> similar to any other kind of resource that must be cleaned up. Even if users 
> forgot to do so it wouldn't affect the actual behavior, just causes unused 
> punctuators to build up. See https://issues.apache.org/jira/browse/KAFKA-7699.
> Given this, I think the options for a complete solution are:
> 1) Implement KAFKA-7699 and then do A or B
> 2) Persist the current punctuation schedule while migrating a task 
> (presumably in the Subscription userdata) and then do C
> Choosing the best course of action here is probably blocked on a decision on 
> whether or not we want to anchor wallclock punctuations (KAFKA-7699). If we 
> can't get consensus on that, we could always
> 3) Introduce a third type of punctuation, then do both 1 and 2 (for the new 
> "anchored-wall-clock" type and the existing "wall-clock" type, respectively).
>  
> -*Another naive workaround for this issue is to turn on/upgrade to 
> cooperative rebalancing, which will not suspend and resume all active tasks 
> during a rebalance, and only suspend tasks that will be immediately closed 
> and migrated to another instance or StreamThread. Of course, this will still 
> cause the punctuation to be reset for tasks that _are_ actually 
> closed/migrated, so practically speaking it's identical to option C alone



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to