This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 25457377e3 HOTFIX: fix broken trunk due to conflicting and overlapping commits (#12074) 25457377e3 is described below commit 25457377e3883390b313fb3e6e7f8d1308e5a5ea Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Wed Apr 20 14:39:15 2022 -0700 HOTFIX: fix broken trunk due to conflicting and overlapping commits (#12074) Reviewers: Victoria Xia <victoria....@confluent.io>, David Arthur <mum...@gmail.com> --- .../TimeOrderedCachingPersistentWindowStoreTest.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java index dd8b54662d..697e3c6f51 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java @@ -32,11 +32,12 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; -import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; @@ -204,14 +205,13 @@ public class TimeOrderedCachingPersistentWindowStoreTest { builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())) - .transform(() -> new Transformer<String, String, KeyValue<String, String>>() { + .process(() -> new Processor<String, String, String, String>() { private WindowStore<String, ValueAndTimestamp<String>> store; private int numRecordsProcessed; - private ProcessorContext context; + private org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context; - @SuppressWarnings("unchecked") @Override - public void init(final ProcessorContext processorContext) { + public void init(final org.apache.kafka.streams.processor.api.ProcessorContext<String, String> processorContext) { this.context = processorContext; this.store = processorContext.getStateStore("store-name"); int count = 0; @@ -227,7 +227,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest { } @Override - public KeyValue<String, String> transform(final String key, final String value) { + public void process(final Record<String, String> record) { int count = 0; try (final KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>> all = store.all()) { @@ -239,13 +239,14 @@ public class TimeOrderedCachingPersistentWindowStoreTest { assertThat(count, equalTo(numRecordsProcessed)); - store.put(value, ValueAndTimestamp.make(value, context.timestamp()), context.timestamp()); + store.put(record.value(), ValueAndTimestamp.make(record.value(), record.timestamp()), record.timestamp()); numRecordsProcessed++; - return new KeyValue<>(key, value); + context.forward(record); } + @Override public void close() { }