[ 
https://issues.apache.org/jira/browse/KAFKA-19935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041421#comment-18041421
 ] 

sanghyeok An commented on KAFKA-19935:
--------------------------------------

[~mjsax] Hi!


Thank you for the thoughtful feedback. 
I completely agree with your point regarding the false positive alert. 
Attempting to mathematically guarantee 100% compatibility for all logical 
topology shifts is indeed extremely difficult and could be risky if users 
blindly trust it.

To address this and the implementation concerns, 
I propose refining the scope and approach in two key ways:


*1. Implementation Strategy: Snapshot Testing*

You rightly asked how we would implement the detection since we cannot 
instantiate the previous topology object within the modified codebase.
I propose adopting a Snapshot Testing pattern (similar to Jest Snapshots in 
frontend or Approval Testing in Java).
 * Baseline{*}:{*} The user saves the stable {{TopologyDescription}} (via 
{{{}topology.describe().toString(){}}}) as a text file in the repository.

 * Validation{*}:{*} During unit tests or CI, the validator parses this 
baseline file and compares it against the _current_ {{Topology}} structure.

This approach solves the Time Paradox of needing the old object and allows for 
a reliable comparison of structural changes.

 

*2. Scope: Modular, Rule-Based Validator* 

Instead of a generic {{.diff()}} that tries to interpret intent, we can offer 
specific, opt-in rules. 
The primary use case is not to validate logic, but to catch {*}unintentional 
side effects{*}.
 * {{StateStoreNameCompatibility}} (Critical){*}:{*} This rule strictly 
compares the set of StateStore names. If {{STORE-0001}} disappears and 
{{STORE-0002}} appears, it flags a violation. 

 ** _Goal:_ Prevent accidental data loss due to ID shifting.

 * {{{}SourceTopicCompatibility{}}}{*}:{*} Ensures source topics remain 
consistent.

then we can throw an errors or print WARN log. Also, we can add an new Rule 
Interfaces for other rule.

 

*3. User Adoption*
Regarding user adoption, You are right that users unaware of Named Stores might 
miss this tool. 
However, if integrated into {{{}streams-test-utils{}}}, this tool acts as a 
Topology Linter in local unit tests.


When a developer runs a test, they would get immediate feedback. For 
instance:{*}{*}

*Error: StateStore names have changed. Did you mean to name your operators? See 
[link] for details.*

This effectively let the user know about the importance of naming stores, 
rather than just silently allowing the change.

 

*4. API Usage*
{code:java}
@Test
public void shouldKeepTopologyCompatible() {
    String previousDescription = 
Files.readString(Paths.get("src/test/resources/topology-baseline.txt"));
    String currentDescription = builder.build().describe().toString();

    TopologyValidator validator = new TopologyValidatorBuilder()
                                    .addRule(new StateStoreNameCompatibility())
                                    .addRule(new SourceTopicCompatibility())
                                    .build()
    
    // Option1. validator could throw errors such as 
StateStoreNameCompatibilityViolation()
    // Option2. validator print WARN log such as "StateStore names have 
changed. previous: [A, B], current: [A, C]"
    vaildator.check(previousDescription, currentDescription);
} {code}
This utility would be valuable for both local development (fast feedback) and 
CI pipelines (safety net).

 

 

In summary, by narrowing the scope to Snapshot-based Structural Validation, 
I believe we can provide a high-value utility that prevents common operational 
disasters without over-promising logical compatibility.
What do you think about this "Snapshot & Rule-based" direction?

> Introduce TopologyValidator Utils for improving topology evolution 
> comptiability.
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-19935
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19935
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams-test-utils
>            Reporter: sanghyeok An
>            Assignee: sanghyeok An
>            Priority: Minor
>              Labels: needs-kip
>
> If a developer does not explicitly specify a name for a {{{}StateStore{}}}, 
> its name is generated using an incremental number. Consequently, the 
> corresponding Changelog topic is created using this same generated name.
> Let's assume a scenario where the application evolves and a new Source Node 
> is added. Since the new Source Node is typically built at the beginning of 
> the topology, the incremental numbers for all subsequent nodes shift by 1. As 
> a result, the names of the {{{}StateStore{}}}s change as well.
> For example, the change might look like this:
>  * *Previous:* {{MY-APP-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog}}
>  * *New:* {{MY-APP-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog}}
>  
> This shift can lead to significant operational issues. Simply replaying the 
> Source Topic is often insufficient to make the new {{StateStore}} identical 
> to the previous one.
>  
> There are several scenarios where this is impossible. For instance, if the 
> application only processes messages after the last commit(most case) and the 
> data includes keys that appear infrequently, the state will never match. 
> (Consider a restaurant with very few orders compared to one with many; the 
> "sparse" orders might have existed in the previous StateStore but will likely 
> be missing in the new one after the shift).
> While this might be dismissed as a minor issue by some, it can be a critical 
> problem for organizations where data consistency is paramount.
>  
> A clear solution to this problem is using {*}Named StateStores{*}. However, 
> some users may not be aware of this feature or may not feel the need for it. 
> Furthermore, they might not realize that the IDs of the StateStore and 
> Changelog topic have incremented, leading to the unintentional creation of a 
> new store and potential data loss.
> To address this, I propose introducing a {{TopologyValidator}} as a utility 
> class. Ideally, the usage would look something like this:
> {code:java}
> TopologyValidator.of(prevTopology, newTopology).diff(); {code}
>  * When {{.diff()}} is called, the {{TopologyValidator}} would identify 
> changes in the topology and issue a warning if {{StateStore}} IDs have 
> shifted.
>  * The {{TopologyValidator}} could be included in the Kafka Streams 
> application code or integrated into CI pipelines. Since it essentially 
> requires only the code to build the topology, it does not need a connection 
> to an actual broker. Therefore, we can verify the diff using just the 
> previous and new topology definitions.
>  
> By introducing {{TopologyValidator}} under the {{streams/test-utils}} module 
> and encouraging users to utilize it, I believe we can promote greater 
> stability from the perspective of {*}Topology Evolution Compatibility{*}.
>  
> Please give your opinion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to