[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343508#comment-17343508
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12775:
------------------------------------------------

[~nhab] while Bruno is correct that topological changes are never guaranteed to 
be compatible, if you are 100% certain that a change _is_ backwards compatible 
then you can still work around. Instead of fully resetting the app, or even 
clearing out the entire local state, you can just delete the task directories 
corresponding to the subtopologies that no longer exist. This does require a 
somewhat deeper understanding of Kafka Streams, but imo this seems acceptable, 
as only more advanced users are likely to be able to analyze a change to 
determine whether it's really going to be compatible or not. 

Imo it's also a good idea to force users to manually clean up the parts of the 
topology which don't exist any more. If you don't, then this could corrupt your 
application later if in the future you decide to make another 
backwards-compatible change which adds a new subtopology. If you didn't clean 
up after the old one, and eventually forgot about it, then adding a new 
subtopology would not actually be compatible (until you do clean it up).

Note that to really clean it up, you'd need to also delete any internal topics 
(eg changelogs) that correspond to the removed subtopology

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> ---------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12775
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Nico Habermann
>            Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to