mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427603579



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ? 
super V> processorSuppl
         final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
                 name,
                 new ProcessorParameters<>(processorSupplier, name),
-                stateStoreNames
+                getStoreNamesAndMaybeAddStores(processorSupplier, 
stateStoreNames)
         );
 
         builder.addGraphNode(this.streamsGraphNode, processNode);
     }
 
+    /**
+     * Provides store names that should be connected to a {@link 
StatefulProcessorNode}, from two sources:
+     * 1) Store names are provided as arguments to process(...), 
transform(...), etc.
+     * 2) {@link StoreBuilder}s are provided by the 
Processor/TransformerSupplier itself, by returning a set from
+     * {@link ConnectedStoreProvider#stores()}.  The {@link StoreBuilder}s 
will also be added to the topology.
+     */
+    private String[] getStoreNamesAndMaybeAddStores(final 
ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {

Review comment:
       Sorry for not being detailed enough. My bad (re-reading my comment it 
does not really make sense). What I actually meant was:
   
   Within `KStreamImpl` we should not access the `builder` at all. Kafka 
Streams uses a 2-phase translation: in the first phase, we only build up a 
logical representation of the topology (what we call the "stream graph" that 
comprises "stream graph node"). If optimization is enabled, the optimizer would 
potentially modify the "stream graph" before it gets translated into the actual 
topology.
   
   The created `StatefulProcessorNode` is one of those "stream graph nodes" -- 
in its `writeToTopology()` method it will add it's state store to the 
`Topology` and connect the processor to other stores if necessary.
   
   Thus, your code adds the store "too early" and returns a list of store names 
that the processor is later connecting to. Instead, we should modify 
`StatefulProcessorNode` and delay adding the store to the topology until 
`writeToTopology()` is called: in particular `StatefulProcessorNode` should 
have a constructor that accepts both an array of store names it should connect 
to and a `ConnectedStoreProvider`.
   
   Does this clarify?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to