chickenchickenlove opened a new pull request, #20968:
URL: https://github.com/apache/kafka/pull/20968

   ### Description
   
   This PR refactors the `DSL`'s optimization logic (reusing the input topic as 
the changelog) to leverage the public `Topology#addReadOnlyStateStore` API 
introduced in KIP-813, replacing the previous usage of internal builder methods.
   
   ### Key Changes:
   - Refactored `TableSourceNode`
     - Replaced the manual wiring of sources and stores (via `addSource`, 
`addProcessor`, `connectSourceStoreAndTopic`) with a call to 
`topologyBuilder.addReadOnlyStateStore`.
   - Updated `InternalTopologyBuilder`:
     - Added an overloaded `addReadOnlyStateStore` method accepting a 
`ProcessorSupplier` with wildcard output types `(?, ?)`.
     - Reasoning: The public PAPI `addReadOnlyStateStore` enforces `Void, Void` 
output types. However, the `DSL`'s `KTableSource` needs to forward records 
downstream (emitting K, Change<V>). The internal overload allows the `DSL` to 
reuse the "read-only" wiring logic while maintaining its forwarding behavior.
   - Added Regression Tests:
     - Added 
`TopologyTest#whenKTableSourceIsOptimizedThenTopologyShouldBeSerialPipeline` - 
Verifies that the generated topology description remains identical to the 
pre-refactoring structure, ensuring strict backward compatibility.
     - Added 
`TopologyTest#whenKTableSourceIsOptimizedThenItsStateStoreShouldNotLog` - 
Confirms that the optimization is correctly applied (changelog logging is 
disabled).
   
   ### Result
   - Fixes https://issues.apache.org/jira/browse/KAFKA-16366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to