[
https://issues.apache.org/jira/browse/KAFKA-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785518#comment-17785518
]
Nicholas Telford commented on KAFKA-13627:
------------------------------------------
Hi,
I've temporarily parked KIP-816 to focus on KIP-892, but if you'd like to take
it over, I'm happy for you to do so.
My team have an implementation of "Option B" in production, which appears to
work well and has proved very reliable. It's written in Kotlin, but should be
trivial to translate to Java if you would like to use it as the basis of an
implementation. You can see it here:
[https://gist.github.com/nicktelford/15cc596a25de33a673bb5bd4c81edd0f]
When I explored Options A and C, I found many difficulties, owing to places
that either depended on the TaskId being present in the state directory path,
or that depended on the format of the TaskId, so I would highly recommend
pursuing Option B, since it's easy to implement, reliable, and the logic can be
isolated from the rest of Kafka Streams, making it easy to maintain.
> Topology changes shouldn't require a full reset of local state
> --------------------------------------------------------------
>
> Key: KAFKA-13627
> URL: https://issues.apache.org/jira/browse/KAFKA-13627
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 3.1.0
> Reporter: Nicholas Telford
> Priority: Major
>
> [KIP-816|https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset]
> When changes are made to a Topology that modifies its structure, users must
> use the Application Reset tool to reset the local state of their application
> prior to deploying the change. Consequently, these changes require rebuilding
> all local state stores from their changelog topics in Kafka.
> The time and cost of rebuilding state stores is determined by the size of the
> state stores, and their recent write history, as rebuilding a store entails
> replaying all recent writes to the store. For applications that have very
> large stores, or stores with extremely high write-rates, the time and cost of
> rebuilding all state in the application can be prohibitively expensive. This
> is a significant barrier to building highly scalable applications with good
> availability.
> Changes to the Topology that do not directly affect a state store should not
> require the local state of that store to be reset/deleted. This would allow
> applications to scale to very large data sets, whilst permitting the
> application behaviour to evolve over time.
> h1. Background
> Tasks in a Kafka Streams Topology are logically grouped by “Topic Group''
> (aka. Subtopology). Topic Groups are assigned an ordinal (number), based on
> their position in the Topology. This Topic Group ordinal is used as the
> prefix for all Task IDs: {{{}<topic-group-ordinal>_<partition-number>{}}},
> e.g. {{2_14}}
> If new Topic Groups are added, old Topic Groups are removed, or existing
> Topic Groups are re-arranged, this can cause the assignment of ordinals to
> change {_}even for Topic Groups that have not been modified{_}.
> When the assignment of ordinals to Topic Groups changes, existing Tasks are
> invalidated, as they no longer correspond to the correct Topic Groups. Local
> state is located in directories that include the Task ID (e.g.
> {{{}/state/dir/2_14/mystore/rocksdb/…{}}}), and since the Tasks have all been
> invalidated, all existing local state directories are also invalid.
> Attempting to start an application that has undergone these ordinal changes,
> without first clearing the local state, will cause Kafka Streams to attempt
> to use the existing local state for the wrong Tasks. Kafka Streams detects
> this discrepancy and prevents the application from starting.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)