rodesai commented on code in PR #17929:
URL: https://github.com/apache/kafka/pull/17929#discussion_r1857289699


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java:
##########
@@ -75,4 +76,79 @@ default void configure(final StreamsConfig config) {
 
     boolean isCompatibleWith(StoreFactory storeFactory);
 
+    class FactoryWrappingStoreBuilder<T extends StateStore> implements 
StoreBuilder<T> {

Review Comment:
   This gave me the spins - previously the factories would wrap builders and 
suppliers so we could have a unified configuration interface. But now we're 
wrapping a factory in a builder, which is confusing and makes the code harder 
to reason about because when I have a builder instance it could actually be a 
factory underneath (not sure if that causes any problems, but it was nice that 
there was a non-circular hierarchy of wrapping before). Would it be better to 
instead revert the dsl operators to create processor instances that return 
unwrapped builders, and then wrap them in a factory from `addStateStore`? So, 
for example `KeyValueStoreMaterializer` would return a StoreBuilder like it 
used to, and then that gets wrapped by `addStateStore`? 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java:
##########
@@ -184,7 +184,7 @@ private KTable<K, Long> doCount(final Named named, final 
Materialized<K, Long, K
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         return doAggregate(
-            new KStreamAggregate<>(materializedInternal.storeName(), 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            new KStreamAggregate<>(materializedInternal, 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),

Review Comment:
   don't we need to make a change to either pass `null` for the store factory 
to `doAggregate`? Otherwise we'll wind up adding the state store to the 
topology again from `StatefulProcessorNode`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to