vvcephei commented on a change in pull request #10381:
URL: https://github.com/apache/kafka/pull/10381#discussion_r603468316



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+/**
+ * An abstract implementation of {@link Processor} that manages the {@link 
ProcessorContext} instance and provides default no-op
+ * implementation of {@link #close()}.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+public abstract class AbstractProcessor<KIn, VIn, KOut, VOut> implements 
Processor<KIn, VIn, KOut, VOut> {
+
+    protected ProcessorContext<KOut, VOut> context;
+
+    protected AbstractProcessor() {}
+
+    @Override
+    public void init(final ProcessorContext<KOut, VOut> context) {
+        this.context = context;
+    }
+
+    /**
+     * Close this processor and clean up any resources.
+     * <p>
+     * This method does nothing by default; if desired, subclasses should 
override it with custom functionality.
+     * </p>
+     */
+    @Override
+    public void close() {
+        // do nothing
+    }
+

Review comment:
       ```suggestion
   ```
   
   This is the same as the default implementation in the interface.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/AbstractProcessor.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+/**
+ * An abstract implementation of {@link Processor} that manages the {@link 
ProcessorContext} instance and provides default no-op
+ * implementation of {@link #close()}.
+ *
+ * @param <KIn> the type of input keys
+ * @param <VIn> the type of input values
+ * @param <KOut> the type of output keys
+ * @param <VOut> the type of output values
+ */
+public abstract class AbstractProcessor<KIn, VIn, KOut, VOut> implements 
Processor<KIn, VIn, KOut, VOut> {

Review comment:
       One minor thing that bugged me about the old PAPI was the 
AbstractProcessor class. It was widely used in places where it provided no 
value. I'm worried that we will just perpetuate here.
   
   Since the new API has default implementations for init and close, the only 
value this new abstract class provides is when the subclass needs the context. 
In that case, we save them from the boilerplace of saving off the context in a 
field.
   
   Therefore, I'd like to call this class `ContextualProcessor` instead of 
`AbstractProcessor`. This should make it a little harder to depend on this 
class unnecessarily.
   
   ```suggestion
   public abstract class ContextualProcessor<KIn, VIn, KOut, VOut> implements 
Processor<KIn, VIn, KOut, VOut> {
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
##########
@@ -32,16 +33,16 @@ public KStreamPeek(final ForeachAction<K, V> action, final 
boolean forwardDownSt
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, K, V> get() {
         return new KStreamPeekProcessor();
     }
 
-    private class KStreamPeekProcessor extends AbstractProcessor<K, V> {
+    private class KStreamPeekProcessor extends AbstractProcessor<K, V, K, V> {
         @Override
-        public void process(final K key, final V value) {
-            action.apply(key, value);
+        public void process(final Record<K, V> record) {
+            action.apply(record.key(), record.value());
             if (forwardDownStream) {
-                context().forward(key, value);
+                context().forward(record);
             }

Review comment:
       Hmm. What do you think about splitting this up and creating a 
ForEachProcessor that implements `Processor<K, V, Void, Void>` and therefore 
cannot forward, then simplifying this class to _always_ forward?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
##########
@@ -30,15 +31,15 @@ public KStreamPrint(final ForeachAction<K, V> action) {
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<K, V, K, V> get() {
         return new KStreamPrintProcessor();
     }
 
-    private class KStreamPrintProcessor extends AbstractProcessor<K, V> {
+    private class KStreamPrintProcessor extends AbstractProcessor<K, V, K, V> {

Review comment:
       ```suggestion
       private class KStreamPrintProcessor extends Processor<K, V, K, V> {
   ```
   See my comment on AbstractProcessor. Since we don't need the context, we 
don't need to depend on any abstract class (since the new Processor has 
defaults for init and close).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -484,7 +484,7 @@ public String queryableStoreName() {
         Objects.requireNonNull(named, "named can't be null");
 
         final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, TOSTREAM_NAME);
-        final ProcessorSupplier<K, Change<V>> kStreamMapValues = new 
KStreamMapValues<>((key, change) -> change.newValue);
+        final KStreamMapValues<K, Change<V>, V> kStreamMapValues = new 
KStreamMapValues<>((key, change) -> change.newValue);
         final ProcessorParameters<K, V, ?, ?> processorParameters = 
unsafeCastProcessorParametersToCompletelyDifferentType(

Review comment:
       Just FYI, @jeqo , during my POC, I tried to fix this right away, and it 
dragged me into migrating the entire DSL at once. I think we should do what you 
are doing instead: just migrate the individual processors first, and _then_ 
come back and drop the unsafe casts in a later PR.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
##########
@@ -18,28 +18,30 @@
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.AbstractProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-class KStreamFlatMap<K, V, K1, V1> implements ProcessorSupplier<K, V> {
+class KStreamFlatMap<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, 
VIn, KOut, VOut> {
 
-    private final KeyValueMapper<? super K, ? super V, ? extends Iterable<? 
extends KeyValue<? extends K1, ? extends V1>>> mapper;
+    private final KeyValueMapper<? super KIn, ? super VIn, ? extends 
Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper;
 
-    KStreamFlatMap(final KeyValueMapper<? super K, ? super V, ? extends 
Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
+    KStreamFlatMap(final KeyValueMapper<? super KIn, ? super VIn, ? extends 
Iterable<? extends KeyValue<? extends KOut, ? extends VOut>>> mapper) {
         this.mapper = mapper;
     }
 
     @Override
-    public Processor<K, V> get() {
+    public Processor<KIn, VIn, KOut, VOut> get() {
         return new KStreamFlatMapProcessor();
     }
 
-    private class KStreamFlatMapProcessor extends AbstractProcessor<K, V> {
+    private class KStreamFlatMapProcessor extends AbstractProcessor<KIn, VIn, 
KOut, VOut> {
         @Override
-        public void process(final K key, final V value) {
-            for (final KeyValue<? extends K1, ? extends V1> newPair : 
mapper.apply(key, value)) {
-                context().forward(newPair.key, newPair.value);
+        public void process(final Record<KIn, VIn> record) {
+            for (final KeyValue<? extends KOut, ? extends VOut> newPair :
+                mapper.apply(record.key(), record.value())) {

Review comment:
       This is a weird line break. It would be better to shorten the line by 
assigning the result of `mapper.apply` to a variable.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
##########
@@ -17,11 +17,12 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.AbstractProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 
-public class KStreamPrint<K, V> implements ProcessorSupplier<K, V> {
+public class KStreamPrint<K, V> implements ProcessorSupplier<K, V, K, V> {

Review comment:
       ```suggestion
   public class KStreamPrint<K, V> implements ProcessorSupplier<K, V, Void, 
Void> {
   ```
   
   This class never forwards.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to