vvcephei commented on code in PR #11993:
URL: https://github.com/apache/kafka/pull/11993#discussion_r847777395


##########
streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java:
##########
@@ -60,7 +61,7 @@ public void init(final InternalProcessorContext<KOut, VOut> 
context) {
 
     @Override
     public void process(final Record<KIn, VIn> record) {
-        processor().process(record);
+        mockProcessor.process(record);

Review Comment:
   Oh, now I see. Why not just capture the result of 
`ProcessorAdapter.adapt(mockProcessor)` in the constructor and use that for the 
`this.mockProcessor` field instead? That way it would already be the right 
type. That's basically what we were doing before, although I agree we don't 
need to have that method to expose the processor on the super class.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java:
##########
@@ -747,14 +748,19 @@ public void 
shouldNotAllowNullTopicChooserWhenAddingSink() {
         assertThrows(NullPointerException.class, () -> builder.addSink("name", 
(TopicNameExtractor<Object, Object>) null, null, null, null));
     }
 
+    // TODO: check if this needs to be expanded

Review Comment:
   I think these are fine.



##########
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java:
##########
@@ -336,6 +344,14 @@ public void setRecordMetadata(final String topic,
         recordMetadata = new MockRecordMetadata(topic, partition, offset);
     }
 
+    public void setCurrentSystemTimeMs(final long currentSystemTimeMs) {
+        this.currentSystemTimeMs = currentSystemTimeMs;
+    }
+
+    public void setCurrentStreamTimeMs(final long currentStreamTimeMs) {
+        this.currentStreamTimeMs = currentStreamTimeMs;
+    }

Review Comment:
   This wasn't in the KIP... Do we need it for something?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java:
##########
@@ -747,14 +748,19 @@ public void 
shouldNotAllowNullTopicChooserWhenAddingSink() {
         assertThrows(NullPointerException.class, () -> builder.addSink("name", 
(TopicNameExtractor<Object, Object>) null, null, null, null));
     }
 
+    // TODO: check if this needs to be expanded
     @Test
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        assertThrows(NullPointerException.class, () -> 
builder.addProcessor(null, () -> null));
+        assertThrows(NullPointerException.class, () -> 
builder.addProcessor(null,
+                                                                            
(ProcessorSupplier<Object, Object, Object, Object>) () -> null
+        ));

Review Comment:
   The alignment is weird here. We either put the whole method call on one line 
(if it's short), or put all args on their own lines.
   
   Eg, something like this:
   ```suggestion
           assertThrows(
               NullPointerException.class,
               () -> builder.addProcessor(
                       null,
                       (ProcessorSupplier<Object, Object, Object, Object>) () 
-> null
                   )
           );
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1501,9 +1508,60 @@ public void process(final 
org.apache.kafka.streams.processor.ProcessorSupplier<?
     }
 
     @Override
-    public void process(final ProcessorSupplier<? super K, ? super V, Void, 
Void> processorSupplier,
-                        final Named named,
-                        final String... stateStoreNames) {
+    public <KOut, VOut> KStream<KOut, VOut> process(
+        final ProcessorSupplier<? super K, ? super V, KOut, VOut> 
processorSupplier,
+        final String... stateStoreNames
+    ) {
+        return process(processorSupplier, 
Named.as(builder.newProcessorName(PROCESSOR_NAME)), stateStoreNames);
+    }
+
+    @Override
+    public <KOut, VOut> KStream<KOut, VOut> process(
+        final ProcessorSupplier<? super K, ? super V, KOut, VOut> 
processorSupplier,
+        final Named named,
+        final String... stateStoreNames
+    ) {
+        Objects.requireNonNull(processorSupplier, "processorSupplier can't be 
null");
+        Objects.requireNonNull(named, "named can't be null");
+        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a 
null array");
+        ApiUtils.checkSupplier(processorSupplier);
+        for (final String stateStoreName : stateStoreNames) {
+            Objects.requireNonNull(stateStoreName, "stateStoreNames can't be 
null");
+        }
+
+        final String name = new NamedInternal(named).name();
+        final StatefulProcessorNode<? super K, ? super V> processNode = new 
StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(processorSupplier, name),
+            stateStoreNames);
+
+        builder.addGraphNode(graphNode, processNode);
+
+        // cannot inherit key and value serde
+        return new KStreamImpl<>(
+            name,
+            null,
+            null,
+            subTopologySourceNodes,
+            true,
+            processNode,
+            builder);
+    }
+
+    @Override
+    public <VOut> KStream<K, VOut> processValues(
+        final FixedKeyProcessorSupplier<? super K, ? super V, VOut> 
processorSupplier,
+        final String... stateStoreNames
+    ) {
+        return processValues(processorSupplier, NamedInternal.empty(), 
stateStoreNames);

Review Comment:
   Hmm, why is this one named `NamedInternal.empty()`, but the other one (on 
L1515) named `Named.as(builder.newProcessorName(PROCESSOR_NAME))`?
   
   We should be really sure that we get the right processor naming behavior. 
It's extremely difficult to change them later, so if we get it wrong, we 
basically have to support the wrong behavior forever with a bunch of 
special-case logic.



##########
streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java:
##########
@@ -152,4 +153,18 @@ public void registerCacheFlushListener(final String 
namespace, final DirtyEntryF
     public String changelogFor(final String storeName) {
         return ProcessorStateManager.storeChangelogTopic(applicationId(), 
storeName, taskId().topologyName());
     }
+
+    // TODO check if needs precision

Review Comment:
   Same here; not sure exactly what this means.



##########
streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java:
##########
@@ -212,6 +214,7 @@ public String changelogFor(final String storeName) {
         return "mock-changelog";
     }
 
+    // TODO check if need to add detail on forwarding

Review Comment:
   I don't think so. It's just doing the same thing that the main 
implementation does.



##########
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java:
##########
@@ -421,6 +421,7 @@ public void 
shouldResizeMaxBufferAfterThreadRemovalTimesOut() throws Interrupted
     }
 
     @Test
+    @SuppressWarnings("deprecation")

Review Comment:
   Many of the tests in this PR are just testing deprecated methods, so it's 
good to just suppress them. But this one is using a deprecated method to test 
something else. I think we should try to minimize add-on changes, since this PR 
is already so big, but can you add a TODO to follow up in a subsequent PR to 
migrate this and tests like it (similar to what we did for the new PAPI)?



##########
streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java:
##########
@@ -187,6 +188,7 @@ public String changelogFor(final String storeName) {
         return "mock-changelog";
     }
 
+    // TODO: Check if needs precision

Review Comment:
   What does this mean?



##########
streams/src/test/java/org/apache/kafka/test/MockProcessor.java:
##########
@@ -53,6 +53,11 @@ public void process(final K key, final V value) {
         delegate.process(new Record<>(key, value, context.timestamp(), 
context.headers()));
     }
 
+    // TODO check needs updating

Review Comment:
   Why not just call `delegate.process(record)`? Actually, why do we need to 
add this method here at all?



##########
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala:
##########
@@ -796,6 +807,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */
+  @deprecated

Review Comment:
   Oy, forgot about the scala API ... again. Do we need to add the new 
interfaces to the scala API? Also, the scala `deprecated` annotation takes 
parameters about when it was deprecated and what you should use instead, which 
we should make use of.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java:
##########
@@ -263,5 +264,19 @@ public void registerCacheFlushListener(final String 
namespace, final DirtyEntryF
         public String changelogFor(final String storeName) {
             return ProcessorStateManager.storeChangelogTopic(applicationId(), 
storeName, taskId().topologyName());
         }
+
+        // TODO double check this additional messages

Review Comment:
   What does this mean?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##########
@@ -37,30 +43,67 @@
     @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
     private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, 
VIn> oldProcessorSupplier;
     private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
+    private final FixedKeyProcessorSupplier<KIn, VIn, VOut> 
fixedKeyProcessorSupplier;
     private final String processorName;
 
     @SuppressWarnings("deprecation") // Old PAPI compatibility.
     public ProcessorParameters(final 
org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
processorSupplier,
                                final String processorName) {
         oldProcessorSupplier = processorSupplier;
         this.processorSupplier = () -> 
ProcessorAdapter.adapt(processorSupplier.get());
+        fixedKeyProcessorSupplier = null;
         this.processorName = processorName;
     }
 
     public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> 
processorSupplier,
                                final String processorName) {
         oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
+        fixedKeyProcessorSupplier = null;
+        this.processorName = processorName;
+    }
+
+    public ProcessorParameters(final FixedKeyProcessorSupplier<KIn, VIn, VOut> 
processorSupplier,
+                               final String processorName) {
+        oldProcessorSupplier = null;
+        this.processorSupplier = null;
+        fixedKeyProcessorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
     public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
         return processorSupplier;
     }
 
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
oldProcessorSupplier() {
-        return oldProcessorSupplier;
+    public FixedKeyProcessorSupplier<KIn, VIn, VOut> 
fixedKeyProcessorSupplier() {
+        return fixedKeyProcessorSupplier;
+    }
+
+    public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, 
final String[] parentNodeNames) {
+        if (processorSupplier != null) {
+            topologyBuilder.addProcessor(processorName, processorSupplier, 
parentNodeNames);
+            if (processorSupplier.stores() != null) {
+                for (final StoreBuilder<?> storeBuilder : 
processorSupplier.stores()) {
+                    topologyBuilder.addStateStore(storeBuilder, processorName);
+                }
+            }
+        }
+        if (fixedKeyProcessorSupplier != null) {
+            topologyBuilder.addProcessor(processorName, 
fixedKeyProcessorSupplier, parentNodeNames);
+            if (fixedKeyProcessorSupplier.stores() != null) {
+                for (final StoreBuilder<?> storeBuilder : 
fixedKeyProcessorSupplier.stores()) {
+                    topologyBuilder.addStateStore(storeBuilder, processorName);
+                }
+            }
+        }
+
+        // temporary hack until KIP-478 is fully implemented
+        // Old PAPI. Needs to be migrated.
+        if (oldProcessorSupplier != null && oldProcessorSupplier.stores() != 
null) {
+            for (final StoreBuilder<?> storeBuilder : 
oldProcessorSupplier.stores()) {
+                topologyBuilder.addStateStore(storeBuilder, processorName);
+            }
+        }

Review Comment:
   Are we missing adding the oldProcessor to the topology here?



##########
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java:
##########
@@ -119,6 +119,7 @@ public Set<StoreBuilder<?>> stores() {
         }
     }
 
+    @SuppressWarnings("deprecation")

Review Comment:
   Should we plan to update the demo to use the new API in a follow-on PR?



##########
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java:
##########
@@ -500,4 +501,19 @@ public void addChangelogForStore(final String storeName, 
final String changelogT
     public String changelogFor(final String storeName) {
         return storeToChangelogTopic.get(storeName);
     }
+
+    // TODO validate if needs comment to understand forwarding

Review Comment:
   It _seems_ pretty straightforward to me...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to