pgwhalen commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427629040



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
                 name,
                 new ProcessorParameters<>(processorSupplier, name),
-                stateStoreNames
+                getStoreNamesAndMaybeAddStores(processorSupplier, 
stateStoreNames)
         );
 
         builder.addGraphNode(this.streamsGraphNode, processNode);
     }
 
+    /**
+     * Provides store names that should be connected to a {@link 
StatefulProcessorNode}, from two sources:
+     * 1) Store names are provided as arguments to process(...), 
transform(...), etc.
+     * 2) {@link StoreBuilder}s are provided by the 
Processor/TransformerSupplier itself, by returning a set from
+     * {@link ConnectedStoreProvider#stores()}.  The {@link StoreBuilder}s 
will also be added to the topology.
+     */
+    private String[] getStoreNamesAndMaybeAddStores(final 
ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {

Review comment:
       Makes total sense, apologies for missing the subtlety.  It's possible I 
wrote the first version of this code before there even was that extra 
optimization step, and never bothered to change my mental model of how it 
should work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to