pgwhalen commented on a change in pull request #6824: URL: https://github.com/apache/kafka/pull/6824#discussion_r427629040
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ########## @@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>( name, new ProcessorParameters<>(processorSupplier, name), - stateStoreNames + getStoreNamesAndMaybeAddStores(processorSupplier, stateStoreNames) ); builder.addGraphNode(this.streamsGraphNode, processNode); } + /** + * Provides store names that should be connected to a {@link StatefulProcessorNode}, from two sources: + * 1) Store names are provided as arguments to process(...), transform(...), etc. + * 2) {@link StoreBuilder}s are provided by the Processor/TransformerSupplier itself, by returning a set from + * {@link ConnectedStoreProvider#stores()}. The {@link StoreBuilder}s will also be added to the topology. + */ + private String[] getStoreNamesAndMaybeAddStores(final ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) { Review comment: Makes total sense, apologies for missing the subtlety. It's possible I wrote the first version of this code before there even was that extra optimization step, and never bothered to change my mental model of how it should work. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org