rodesai commented on code in PR #17929: URL: https://github.com/apache/kafka/pull/17929#discussion_r1857289699
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java: ########## @@ -75,4 +76,79 @@ default void configure(final StreamsConfig config) { boolean isCompatibleWith(StoreFactory storeFactory); + class FactoryWrappingStoreBuilder<T extends StateStore> implements StoreBuilder<T> { Review Comment: This gave me the spins - previously the factories would wrap builders and suppliers so we could have a unified configuration interface. But now we're wrapping a factory in a builder, which is confusing and makes the code harder to reason about because when I have a builder instance it could actually be a factory underneath (not sure if that causes any problems, but it was nice that there was a non-circular hierarchy of wrapping before). Would it be better to instead revert the dsl operators to create processor instances that return unwrapped builders, and then wrap them in a factory from `addStateStore`? So, for example `KeyValueStoreMaterializer` would return a StoreBuilder like it used to, and then that gets wrapped by `addStateStore`? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java: ########## @@ -184,7 +184,7 @@ private KTable<K, Long> doCount(final Named named, final Materialized<K, Long, K final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return doAggregate( - new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), + new KStreamAggregate<>(materializedInternal, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), Review Comment: don't we need to make a change to either pass `null` for the store factory to `doAggregate`? Otherwise we'll wind up adding the state store to the topology again from `StatefulProcessorNode` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org