ableegoldman commented on code in PR #17903:
URL: https://github.com/apache/kafka/pull/17903#discussion_r1853796622


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java:
##########
@@ -116,16 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) {
                                       consumedInternal().valueDeserializer(),
                                       topicName);
 
-            topologyBuilder.addProcessor(processorParameters.processorName(), 
processorParameters.processorSupplier(), sourceName);
+            processorParameters.addProcessorTo(topologyBuilder, new String[] 
{sourceName});
 
-            // only add state store if the source KTable should be materialized
+            // if the KTableSource should not be materialized, stores will be 
null or empty
             final KTableSource<K, V> tableSource = (KTableSource<K, V>) 
processorParameters.processorSupplier();
-            if (tableSource.materialized()) {
-                topologyBuilder.addStateStore(storeFactory, nodeName());
-
+            if (tableSource.stores() != null) {
                 if (shouldReuseSourceTopicForChangelog) {
-                    storeFactory.withLoggingDisabled();
-                    
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName);
+                    tableSource.stores().forEach(store -> {
+                        store.withLoggingDisabled();
+                        
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);

Review Comment:
   Btw I noticed that this is actually called inside #addStateStore, at least 
as long as the parent processor names are passed in, which should always be the 
case except for with global tables/stores, and when adding stores to a Topology 
directly via #addStateStore and connecting them manually using 
#connectProcessorToStateStores (ie the alternative to implementing 
ProcessorSupplier#stores)
   
   In both those cases #connectSourceStoreAndTopic is called directly, so 
AFAICT there's no reason to be invoking 
`topologyBuilder.connectSourceStoreAndTopic` all over the place including right 
here.
   
   Granted, it's idempotent so calling it again is fine, but it makes the 
already-messy topology building code even more confusing. Might be nice to 
remove all these extraneous calls (can be done in a separate PR so we can make 
sure it doesn't break anything)



##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -384,7 +384,8 @@ public synchronized <K, V> GlobalKTable<K, V> 
globalTable(final String topic,
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
             new MaterializedInternal<>(
                 Materialized.with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()),
-                internalStreamsBuilder, topic + "-");
+                internalStreamsBuilder, topic + "-",

Review Comment:
   nit: can you fix this line to put params on new lines?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java:
##########
@@ -75,4 +76,78 @@ default void configure(final StreamsConfig config) {
 
     boolean isCompatibleWith(StoreFactory storeFactory);
 
+    class FactoryWrappingStoreBuilder<T extends StateStore> implements 
StoreBuilder<T> {
+
+        private final StoreFactory storeFactory;
+
+        public FactoryWrappingStoreBuilder(final StoreFactory storeFactory) {
+            this.storeFactory = storeFactory;
+        }
+
+        public StoreFactory storeFactory() {
+            return storeFactory;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            final FactoryWrappingStoreBuilder<?> that = 
(FactoryWrappingStoreBuilder<?>) o;
+
+            return storeFactory.isCompatibleWith(that.storeFactory);
+        }
+
+        @Override
+        public int hashCode() {
+            return storeFactory.hashCode();
+        }
+
+        @Override
+        public StoreBuilder<T> withCachingEnabled() {
+            throw new IllegalStateException("Should not try to modify 
StoreBuilder wrapper");
+        }
+
+        @Override
+        public StoreBuilder<T> withCachingDisabled() {
+            throw new IllegalStateException("Should not try to modify 
StoreBuilder wrapper");

Review Comment:
   I'm not 100% sure it's necessary, but I think we might as well delegate to 
the StoryFactory here as well. If the StoryFactory has/needs this method for 
whatever reason then it seems theoretically possible for it to be callled 
during the topology building process (whereas imo it's highly unlikely for 
withLoggingEnabled or withCachingEnabled to be called on this, especially if 
those methods don't even  exist on StoryFactory)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java:
##########
@@ -39,12 +39,19 @@ public MaterializedInternal(final Materialized<K, V, S> 
materialized) {
     public MaterializedInternal(final Materialized<K, V, S> materialized,
                                 final InternalNameProvider nameProvider,
                                 final String generatedStorePrefix) {
+        this(materialized, nameProvider, generatedStorePrefix, false);
+    }
+
+    public MaterializedInternal(final Materialized<K, V, S> materialized,
+                                final InternalNameProvider nameProvider,
+                                final String generatedStorePrefix,
+                                final boolean forceQueryable) {

Review Comment:
   nice refactor, this is way cleaner/easy to follow than randomly overriding 
the queryableStoreName for things like global stores



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java:
##########
@@ -40,15 +43,16 @@ public class KTableSource<KIn, VIn> implements 
ProcessorSupplier<KIn, VIn, KIn,
     private static final Logger LOG = 
LoggerFactory.getLogger(KTableSource.class);
 
     private final String storeName;
+    private final StoreFactory storeFactory;
     private String queryableName;
     private boolean sendOldValues;
 
-    public KTableSource(final String storeName, final String queryableName) {
-        Objects.requireNonNull(storeName, "storeName can't be null");

Review Comment:
    we should probably keep this check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to