mjsax commented on a change in pull request #6824:
URL: https://github.com/apache/kafka/pull/6824#discussion_r427603579
##########
File path:
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##########
@@ -700,12 +703,29 @@ public void process(final ProcessorSupplier<? super K, ?
super V> processorSuppl
final StatefulProcessorNode<? super K, ? super V> processNode = new
StatefulProcessorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
- stateStoreNames
+ getStoreNamesAndMaybeAddStores(processorSupplier,
stateStoreNames)
);
builder.addGraphNode(this.streamsGraphNode, processNode);
}
+ /**
+ * Provides store names that should be connected to a {@link
StatefulProcessorNode}, from two sources:
+ * 1) Store names are provided as arguments to process(...),
transform(...), etc.
+ * 2) {@link StoreBuilder}s are provided by the
Processor/TransformerSupplier itself, by returning a set from
+ * {@link ConnectedStoreProvider#stores()}. The {@link StoreBuilder}s
will also be added to the topology.
+ */
+ private String[] getStoreNamesAndMaybeAddStores(final
ConnectedStoreProvider storeProvider, final String[] varargsStoreNames) {
Review comment:
Sorry for not being detailed enough. My bad (re-reading my comment it
does not really make sense). What I actually meant was:
Within `KStreamImpl` we should not access the `builder` at all. Kafka
Streams uses a 2-phase translation: in the first phase, we only build up a
logical representation of the topology (what we call the "stream graph" that
comprises "stream graph node"). If optimization is enabled, the optimizer would
potentially modify the "stream graph" before it gets translated into the actual
topology.
The created `StatefulProcessorNode` is one of those "stream graph nodes" --
in its `writeToTopology()` method it will add it's state store to the
`Topology` and connect the processor to other stores if necessary.
Thus, your code adds the store "too early" and returns a list of store names
that the processor is later connecting to. Instead, we should modify
`StatefulProcessorNode` and delay adding the store to the topology until
`writeToTopology()` is called: in particular `StatefulProcessorNode` should
have a constructor that accepts both an array of store names it should connect
to and a `ConnectedStoreProvider`.
Does this clarify?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]