ableegoldman commented on code in PR #17903: URL: https://github.com/apache/kafka/pull/17903#discussion_r1853796622
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java: ########## @@ -116,16 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { consumedInternal().valueDeserializer(), topicName); - topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName); + processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName}); - // only add state store if the source KTable should be materialized + // if the KTableSource should not be materialized, stores will be null or empty final KTableSource<K, V> tableSource = (KTableSource<K, V>) processorParameters.processorSupplier(); - if (tableSource.materialized()) { - topologyBuilder.addStateStore(storeFactory, nodeName()); - + if (tableSource.stores() != null) { if (shouldReuseSourceTopicForChangelog) { - storeFactory.withLoggingDisabled(); - topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName); + tableSource.stores().forEach(store -> { + store.withLoggingDisabled(); + topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName); Review Comment: Btw I noticed that this is actually called inside #addStateStore, at least as long as the parent processor names are passed in, which should always be the case except for with global tables/stores, and when adding stores to a Topology directly via #addStateStore and connecting them manually using #connectProcessorToStateStores (ie the alternative to implementing ProcessorSupplier#stores) In both those cases #connectSourceStoreAndTopic is called directly, so AFAICT there's no reason to be invoking `topologyBuilder.connectSourceStoreAndTopic` all over the place including right here. Granted, it's idempotent so calling it again is fine, but it makes the already-messy topology building code even more confusing. Might be nice to remove all these extraneous calls (can be done in a separate PR so we can make sure it doesn't break anything) ########## streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java: ########## @@ -384,7 +384,8 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>( Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), - internalStreamsBuilder, topic + "-"); + internalStreamsBuilder, topic + "-", Review Comment: nit: can you fix this line to put params on new lines? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java: ########## @@ -75,4 +76,78 @@ default void configure(final StreamsConfig config) { boolean isCompatibleWith(StoreFactory storeFactory); + class FactoryWrappingStoreBuilder<T extends StateStore> implements StoreBuilder<T> { + + private final StoreFactory storeFactory; + + public FactoryWrappingStoreBuilder(final StoreFactory storeFactory) { + this.storeFactory = storeFactory; + } + + public StoreFactory storeFactory() { + return storeFactory; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final FactoryWrappingStoreBuilder<?> that = (FactoryWrappingStoreBuilder<?>) o; + + return storeFactory.isCompatibleWith(that.storeFactory); + } + + @Override + public int hashCode() { + return storeFactory.hashCode(); + } + + @Override + public StoreBuilder<T> withCachingEnabled() { + throw new IllegalStateException("Should not try to modify StoreBuilder wrapper"); + } + + @Override + public StoreBuilder<T> withCachingDisabled() { + throw new IllegalStateException("Should not try to modify StoreBuilder wrapper"); Review Comment: I'm not 100% sure it's necessary, but I think we might as well delegate to the StoryFactory here as well. If the StoryFactory has/needs this method for whatever reason then it seems theoretically possible for it to be callled during the topology building process (whereas imo it's highly unlikely for withLoggingEnabled or withCachingEnabled to be called on this, especially if those methods don't even exist on StoryFactory) ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java: ########## @@ -39,12 +39,19 @@ public MaterializedInternal(final Materialized<K, V, S> materialized) { public MaterializedInternal(final Materialized<K, V, S> materialized, final InternalNameProvider nameProvider, final String generatedStorePrefix) { + this(materialized, nameProvider, generatedStorePrefix, false); + } + + public MaterializedInternal(final Materialized<K, V, S> materialized, + final InternalNameProvider nameProvider, + final String generatedStorePrefix, + final boolean forceQueryable) { Review Comment: nice refactor, this is way cleaner/easy to follow than randomly overriding the queryableStoreName for things like global stores ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java: ########## @@ -40,15 +43,16 @@ public class KTableSource<KIn, VIn> implements ProcessorSupplier<KIn, VIn, KIn, private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class); private final String storeName; + private final StoreFactory storeFactory; private String queryableName; private boolean sendOldValues; - public KTableSource(final String storeName, final String queryableName) { - Objects.requireNonNull(storeName, "storeName can't be null"); Review Comment: we should probably keep this check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org