ableegoldman commented on code in PR #18155:
URL: https://github.com/apache/kafka/pull/18155#discussion_r1883007521


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##########
@@ -33,20 +38,31 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
     private final ValueMapperWithKey<? super KIn, ? super VIn, ? extends VOut> 
mapper;
     private final String queryableName;
     private boolean sendOldValues = false;
+    private final StoreFactory storeFactory;
 
     KTableMapValues(final KTableImpl<KIn, ?, VIn> parent,
                     final ValueMapperWithKey<? super KIn, ? super VIn, ? 
extends VOut> mapper,
-                    final String queryableName) {
+                    final String queryableName,
+                    final StoreFactory storeFactory) {
         this.parent = parent;
         this.mapper = mapper;
         this.queryableName = queryableName;
+        this.storeFactory = storeFactory;
     }
 
     @Override
     public Processor<KIn, Change<VIn>, KIn, Change<VOut>> get() {
         return new KTableMapValuesProcessor();
     }
 
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        if (storeFactory == null) {
+            return Collections.emptySet();

Review Comment:
   doesn't really matter but FYI, we can just return null



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java:
##########
@@ -62,21 +62,25 @@ public String toString() {
     @SuppressWarnings("unchecked")
     @Override
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
+        processorParameters.addProcessorTo(topologyBuilder, parentNodeNames());
+
         final String processorName = processorParameters.processorName();
-        topologyBuilder.addProcessor(processorName, 
processorParameters.processorSupplier(), parentNodeNames());
 
         if (storeNames.length > 0) {
+            // todo(rodesai): remove me once all operators have been moved to 
ProcessorSupplier
             topologyBuilder.connectProcessorAndStateStores(processorName, 
storeNames);
         }
 
         final KTableSource<K, V> tableSource =  
processorParameters.processorSupplier() instanceof KTableSource ?
                 (KTableSource<K, V>) processorParameters.processorSupplier() : 
null;
         if (tableSource != null) {
+            // todo(rodesai): remove once KTableImpl#doJoinOnForeignKey moved 
to ProcessorSupplier
             if (tableSource.materialized()) {
                 
topologyBuilder.addStateStore(Objects.requireNonNull(storeFactory, 
"storeFactory was null"),
                                               processorName);
             }

Review Comment:
   this is interesting -- I'm not 100% sure but I think we can actually remove 
this part right now, because Almog already did a PR to convert the KTableSource 
to implementing #stores
   
   I'm pretty sure I remember doing exactly this (removing this code) in my 
original rough draft POC, so I'd be interested if we can get rid of this right 
now. I guess try and see if tests fail?



##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -1687,15 +1707,15 @@ public void shouldWrapProcessorsForStatelessOperators() 
{
             .peek((k, v) -> { }, Named.as("peek")) // wrapped 4
             .flatMapValues(e -> new ArrayList<>(), Named.as("flatMap")) // 
wrapped 5
             .toTable(Named.as("toTable")) // should be wrapped when we do 
StreamToTableNode
-            .filter((k, v) -> true, Named.as("filter-table")) // should be 
wrapped once we do TableProcessorNode

Review Comment:
   to be honest I had no idea this was a stateful operation when I wrote this 
test, as you can tell from the name being 
`shouldWrapProcessorsForStatelessOperators` -- we should either change the name 
or cut out the `.toTable().filter().toStream()` part
   
   Honestly either is fine with me, since we have a separate test for the 
KTable filter now. Though I kind of like the idea of having one test with 
multiple operators for the KTable stuff, since you never know how things might 
interact in the DSL. Maybe just move this part to a separate test and we can 
have one test that covers all the stateful KTable operators you're doing here 
(eg mapValues, filter, etc)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to