mjsax commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r427603579
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ########## @@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>( name, new ProcessorParameters<>(processorSupplier, name), - stateStoreNames + getStoreNamesAndMaybeAddStores(processorSupplier, stateStoreNames) ); builder.addGraphNode(this.streamsGraphNode, processNode); } + /** + * Provides store names that should be connected to a {@link StatefulProcessorNode}, from two sources: + * 1) Store names are provided as arguments to process(...), transform(...), etc. + * 2) {@link StoreBuilder}s are provided by the Processor/TransformerSupplier itself, by returning a set from + * {@link ConnectedStoreProvider#stores()}. The {@link StoreBuilder}s will also be added to the topology. + */ + private String[] getStoreNamesAndMaybeAddStores(final ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) { Review comment: Sorry for not being detailed enough. My bad (re-reading my comment it does not really make sense). What I actually meant was: Within `KStreamImpl` we should not access the `builder` at all. Kafka Streams uses a 2-phase translation: in the first phase, we only build up a logical representation of the topology (what we call the "stream graph" that comprises "stream graph node"). If optimization is enabled, the optimizer would potentially modify the "stream graph" before it gets translated into the actual topology. The created `StatefulProcessorNode` is one of those "stream graph nodes" -- in its `writeToTopology()` method it will add it's state store to the `Topology` and connect the processor to other stores if necessary. Thus, your code adds the store "too early" and returns a list of store names that the processor is later connecting to. Instead, we should modify `StatefulProcessorNode` and delay adding the store to the topology until `writeToTopology()` is called: in particular `StatefulProcessorNode` should have a constructor that accepts both an array of store names it should connect to and a `ConnectedStoreProvider`. Does this clarify? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org