This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25457377e3 HOTFIX: fix broken trunk due to conflicting and overlapping 
commits (#12074)
25457377e3 is described below

commit 25457377e3883390b313fb3e6e7f8d1308e5a5ea
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Wed Apr 20 14:39:15 2022 -0700

    HOTFIX: fix broken trunk due to conflicting and overlapping commits (#12074)
    
    Reviewers: Victoria Xia <victoria....@confluent.io>, David Arthur 
<mum...@gmail.com>
---
 .../TimeOrderedCachingPersistentWindowStoreTest.java    | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index dd8b54662d..697e3c6f51 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -32,11 +32,12 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
-import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -204,14 +205,13 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
 
         builder.stream(TOPIC,
             Consumed.with(Serdes.String(), Serdes.String()))
-            .transform(() -> new Transformer<String, String, KeyValue<String, 
String>>() {
+            .process(() -> new Processor<String, String, String, String>() {
                 private WindowStore<String, ValueAndTimestamp<String>> store;
                 private int numRecordsProcessed;
-                private ProcessorContext context;
+                private 
org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context;
 
-                @SuppressWarnings("unchecked")
                 @Override
-                public void init(final ProcessorContext processorContext) {
+                public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext<String, String> 
processorContext) {
                     this.context = processorContext;
                     this.store = processorContext.getStateStore("store-name");
                     int count = 0;
@@ -227,7 +227,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
                 }
 
                 @Override
-                public KeyValue<String, String> transform(final String key, 
final String value) {
+                public void process(final Record<String, String> record) {
                     int count = 0;
 
                     try (final KeyValueIterator<Windowed<String>, 
ValueAndTimestamp<String>> all = store.all()) {
@@ -239,13 +239,14 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
 
                     assertThat(count, equalTo(numRecordsProcessed));
 
-                    store.put(value, ValueAndTimestamp.make(value, 
context.timestamp()), context.timestamp());
+                    store.put(record.value(), 
ValueAndTimestamp.make(record.value(), record.timestamp()), record.timestamp());
 
                     numRecordsProcessed++;
 
-                    return new KeyValue<>(key, value);
+                    context.forward(record);
                 }
 
+
                 @Override
                 public void close() {
                 }

Reply via email to