mjsax commented on code in PR #17198:
URL: https://github.com/apache/kafka/pull/17198#discussion_r1779666808


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1243,24 +1243,6 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final 
KTable<K, VO> table,
             builder);
     }
 
-    @Override
-    @Deprecated
-    public <KR, VR> KStream<KR, VR> transform(final 
org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, 
KeyValue<KR, VR>> transformerSupplier,
-                                              final String... stateStoreNames) 
{
-        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't 
be null");
-        final String name = builder.newProcessorName(TRANSFORM_NAME);
-        return flatTransform(new 
TransformerSupplierAdapter<>(transformerSupplier), Named.as(name), 
stateStoreNames);

Review Comment:
   `TransformerSupplierAdapter` should be unused now, and I think we can remove 
it with this PR, too, including `TransformerSupplierAdapterTest`?



##########
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:
##########
@@ -447,28 +410,4 @@ class KStreamTest extends TestDriver {
     val joinNode = 
builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1)
     assertEquals("my-name", joinNode.name())
   }
-
-  @nowarn
-  @Test
-  def testSettingNameOnTransform(): Unit = {
-    class TestTransformer extends Transformer[String, String, KeyValue[String, 
String]] {

Review Comment:
   Same



##########
streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java:
##########
@@ -35,7 +35,6 @@
  * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * @see Transformer
  * @see TransformerSupplier
- * @see KStream#transform(TransformerSupplier, String...)

Review Comment:
   This does make sense, as we don't remove `ValueTransformerSupplier` yet.



##########
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala:
##########
@@ -276,61 +274,6 @@ class TopologyTest {
     assertEquals(getTopologyScala, getTopologyJava)
   }
 
-  @nowarn
-  @Test
-  def shouldBuildIdenticalTopologyInJavaNScalaTransform(): Unit = {

Review Comment:
   Wondering if we have a test gap, as we don't have a similar test for 
`process()` -- should we add a new test for `#process()` like this one (or 
rewrite this test for `#process()`)?



##########
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java:
##########
@@ -360,11 +368,9 @@ public KeyValue<Integer, Integer> transform(final Integer 
key, final Integer val
                             throw new RuntimeException("Injected test error");
                         }
 
-                        return KeyValue.pair(key, value);
+                        store.put(key, value);

Review Comment:
   We do we add a put here?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -179,8 +178,7 @@ <KR> KStream<KR, V> selectKey(final KeyValueMapper<? super 
K, ? super V, ? exten
      * altered arbitrarily).
      * The provided {@link KeyValueMapper} is applied to each input record and 
computes a new output record.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K':V'>}.
-     * This is a stateless record-by-record operation (cf. {@link 
#transform(TransformerSupplier, String...)} for
-     * stateful record transformation).
+     * This is a stateless record-by-record operation.

Review Comment:
   It think we should link to `#process(...)` here, too, instead of removing 
(same elsewhere where it make sense).
   
   Btw: feel free to skip updating JavaDocs of other deprecated methods which 
will be remove in follow up PRs of this ticket.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java:
##########
@@ -33,7 +33,6 @@
  * @param <R> {@link org.apache.kafka.streams.KeyValue KeyValue} return type 
(both key and value type can be set
  *            arbitrarily)
  * @see Transformer
- * @see KStream#transform(TransformerSupplier, String...)

Review Comment:
   As for `Transformer`



##########
streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java:
##########
@@ -43,7 +43,6 @@
  * @param <R> {@link KeyValue} return type (both key and value type can be set
  *            arbitrarily)
  * @see TransformerSupplier
- * @see KStream#transform(TransformerSupplier, String...)

Review Comment:
   We will also remove `Transformer` in a follow up PR completely, right? So no 
need to have any updates in this PR?



##########
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java:
##########
@@ -329,26 +331,32 @@ private KafkaStreams buildWithDeduplicationTopology(final 
String stateDirPath) {
             Serdes.Integer())
         );
         builder.<Integer, Integer>stream(inputTopic)
-            .transform(
-                () -> new 
org.apache.kafka.streams.kstream.Transformer<Integer, Integer, 
KeyValue<Integer, Integer>>() {
+            .process(
+                () -> new Processor<Integer, Integer, Integer, Integer>() {
+                    private ProcessorContext<Integer, Integer> context;
                     private KeyValueStore<Integer, Integer> store;
 
-                    @SuppressWarnings("unchecked")
                     @Override
-                    public void init(final ProcessorContext context) {
-                        store = (KeyValueStore<Integer, Integer>) 
context.getStateStore(storeName);
+                    public void init(final ProcessorContext<Integer, Integer> 
context) {
+                        this.context = context;
+                        store = context.getStateStore(storeName);
                     }
 
                     @Override
-                    public KeyValue<Integer, Integer> transform(final Integer 
key, final Integer value) {
+                    public void process(final Record<Integer, Integer> record) 
{
+                        final Integer key = record.key();
+                        final Integer value = record.value();
+
                         if (skipRecord.get()) {
                             // we only forward so we can verify the skipping 
by reading the output topic
                             // the goal is skipping is to not modify the state 
store
-                            return KeyValue.pair(key, value);
+                            context.forward(record);
+                            return;
                         }
 
                         if (store.get(key) != null) {
-                            return null;
+                            store.delete(key);

Review Comment:
   Why do we insert a delete here?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2334,85 +2332,6 @@ public void process(final Record<Object, Object> record) 
{}
         assertEquals(2, punctuatedWallClockTime.size());
     }
 
-    @ParameterizedTest
-    @MethodSource("data")        
-    @SuppressWarnings("deprecation")
-    public void shouldPunctuateWithTimestampPreservedInProcessorContext(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {

Review Comment:
   I think we need to keep this test, but rewrite using `#process()`.



##########
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:
##########
@@ -222,42 +221,6 @@ class KStreamTest extends TestDriver {
     testDriver.close()
   }
 
-  @nowarn
-  @Test
-  def testTransformCorrectlyRecords(): Unit = {

Review Comment:
   Same question as above. Seems we are lacking a similar test for `#process()` 
?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to