This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8a237f5 KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645) 8a237f5 is described below commit 8a237f599afa539868a138b5a2534dbf884cb4ec Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Mon May 13 00:31:44 2019 +0200 KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (#6645) For session-windows, the result record should have the window-end timestamp as record timestamp. Rebased to resolve merge conflicts. Removed unused classes TupleForwarder and ForwardingCacheFlushListener (replace with TimestampedTupleForwarder, SessionTupleForwarder, TimestampedCacheFlushListerner, and SessionCacheFlushListener) Reviewers: John Roesler <j...@confluent.io>, Bruno Cadonna <br...@confluent.io>, Boyang Chen <boy...@confluent.io>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../internals/KStreamSessionWindowAggregate.java | 10 +- ...istener.java => SessionCacheFlushListener.java} | 9 +- ...leForwarder.java => SessionTupleForwarder.java} | 21 +++-- .../processor/internals/ProcessorContextImpl.java | 11 ++- .../internals/ProcessorRecordContext.java | 6 +- .../state/internals/AbstractStoreBuilder.java | 4 +- .../state/internals/CachingSessionStore.java | 1 - .../state/internals/SessionStoreBuilder.java | 19 ++-- .../KStreamAggregationIntegrationTest.java | 101 ++++++++++++++------- .../kstream/internals/KGroupedStreamImplTest.java | 95 ++++++++++++------- ...KStreamSessionWindowAggregateProcessorTest.java | 88 ++++++++++++++---- .../internals/SessionCacheFlushListenerTest.java | 52 +++++++++++ ...derTest.java => SessionTupleForwarderTest.java} | 34 ++++--- .../internals/SessionWindowedKStreamImplTest.java | 93 ++++++++++++++----- .../kstream/internals/SuppressScenarioTest.java | 37 +++++--- .../state/internals/CachingSessionStoreTest.java | 91 ++++++++++++++----- .../state/internals/SessionStoreBuilderTest.java | 50 ++++++---- .../java/org/apache/kafka/test/MockProcessor.java | 9 ++ .../kafka/streams/scala/kstream/KTableTest.scala | 20 +++- 19 files changed, 531 insertions(+), 220 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 3168393..68dc4a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -78,7 +78,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K, V> { private SessionStore<K, Agg> store; - private TupleForwarder<Windowed<K>, Agg> tupleForwarder; + private SessionTupleForwarder<K, Agg> tupleForwarder; private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; @@ -93,7 +93,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce lateRecordDropSensor = Sensors.lateRecordDropSensor(internalProcessorContext); store = (SessionStore<K, Agg>) context.getStateStore(storeName); - tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<>(context), sendOldValues); + tupleForwarder = new SessionTupleForwarder<>(store, context, new SessionCacheFlushListener<>(context), sendOldValues); } @Override @@ -109,10 +109,10 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce return; } - observedStreamTime = Math.max(observedStreamTime, context().timestamp()); + final long timestamp = context().timestamp(); + observedStreamTime = Math.max(observedStreamTime, timestamp); final long closeTime = observedStreamTime - windows.gracePeriodMs(); - final long timestamp = context().timestamp(); final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>(); final SessionWindow newSessionWindow = new SessionWindow(timestamp, timestamp); SessionWindow mergedWindow = newSessionWindow; @@ -148,7 +148,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce context().topic(), context().partition(), context().offset(), - context().timestamp(), + timestamp, mergedWindow.start(), mergedWindow.end(), closeTime, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java similarity index 85% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java index 6c6b8fd..f40fdfe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java @@ -16,30 +16,31 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> { +class SessionCacheFlushListener<K, V> implements CacheFlushListener<Windowed<K>, V> { private final InternalProcessorContext context; private final ProcessorNode myNode; - ForwardingCacheFlushListener(final ProcessorContext context) { + SessionCacheFlushListener(final ProcessorContext context) { this.context = (InternalProcessorContext) context; myNode = this.context.currentNode(); } @Override - public void apply(final K key, + public void apply(final Windowed<K> key, final V newValue, final V oldValue, final long timestamp) { final ProcessorNode prev = context.currentNode(); context.setCurrentNode(myNode); try { - context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp)); + context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(key.window().end())); } finally { context.setCurrentNode(prev); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java similarity index 75% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java index 94b0ebd..bad255a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java @@ -16,8 +16,11 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.state.internals.CacheFlushListener; import org.apache.kafka.streams.state.internals.WrappedStateStore; /** @@ -25,29 +28,29 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; * Forwarding by this class only occurs when caching is not enabled. If caching is enabled, * forwarding occurs in the flush listener when the cached store flushes. * - * @param <K> the type of the key - * @param <V> the type of the value + * @param <K> + * @param <V> */ -class TupleForwarder<K, V> { +class SessionTupleForwarder<K, V> { private final ProcessorContext context; private final boolean sendOldValues; private final boolean cachingEnabled; @SuppressWarnings("unchecked") - TupleForwarder(final StateStore store, - final ProcessorContext context, - final ForwardingCacheFlushListener<K, V> flushListener, - final boolean sendOldValues) { + SessionTupleForwarder(final StateStore store, + final ProcessorContext context, + final CacheFlushListener<Windowed<K>, V> flushListener, + final boolean sendOldValues) { this.context = context; this.sendOldValues = sendOldValues; cachingEnabled = ((WrappedStateStore) store).setFlushListener(flushListener, sendOldValues); } - public void maybeForward(final K key, + public void maybeForward(final Windowed<K> key, final V newValue, final V oldValue) { if (!cachingEnabled) { - context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null)); + context.forward(key, new Change<>(newValue, sendOldValues ? oldValue : null), To.all().withTimestamp(key.window().end())); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 1df6610..cd2b179 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -160,12 +160,17 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final V value, final To to) { final ProcessorNode previousNode = currentNode(); - final long currentTimestamp = recordContext.timestamp(); + final ProcessorRecordContext previousContext = recordContext; try { toInternal.update(to); if (toInternal.hasTimestamp()) { - recordContext.setTimestamp(toInternal.timestamp()); + recordContext = new ProcessorRecordContext( + toInternal.timestamp(), + recordContext.offset(), + recordContext.partition(), + recordContext.topic(), + recordContext.headers()); } final String sendTo = toInternal.child(); @@ -183,7 +188,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re forward(child, key, value); } } finally { - recordContext.setTimestamp(currentTimestamp); + recordContext = previousContext; setCurrentNode(previousNode); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index cc512ae..1b22482 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -29,7 +29,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; public class ProcessorRecordContext implements RecordContext { - private long timestamp; + private final long timestamp; private final long offset; private final String topic; private final int partition; @@ -48,10 +48,6 @@ public class ProcessorRecordContext implements RecordContext { this.headers = headers; } - public void setTimestamp(final long timestamp) { - this.timestamp = timestamp; - } - @Override public long offset() { return offset; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java index 898db9e..fd8267b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java @@ -38,8 +38,8 @@ abstract public class AbstractStoreBuilder<K, V, T extends StateStore> implement final Serde<K> keySerde, final Serde<V> valueSerde, final Time time) { - Objects.requireNonNull(name, "name can't be null"); - Objects.requireNonNull(time, "time can't be null"); + Objects.requireNonNull(name, "name cannot be null"); + Objects.requireNonNull(time, "time cannot be null"); this.name = name; this.keySerde = keySerde; this.valueSerde = valueSerde; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index adbaf4c..d48f540 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordQueue; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; - import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java index a40eab3..51ef319 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; +import java.util.Objects; + public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, SessionStore<K, V>> { @@ -31,26 +33,25 @@ public class SessionStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, Sessio final Serde<K> keySerde, final Serde<V> valueSerde, final Time time) { - super(storeSupplier.name(), keySerde, valueSerde, time); + super(Objects.requireNonNull(storeSupplier, "supplier cannot be null").name(), keySerde, valueSerde, time); this.storeSupplier = storeSupplier; } @Override public SessionStore<K, V> build() { - return new MeteredSessionStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), - storeSupplier.metricsScope(), - keySerde, - valueSerde, - time); + return new MeteredSessionStore<>( + maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), + storeSupplier.metricsScope(), + keySerde, + valueSerde, + time); } private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, byte[]> inner) { if (!enableCaching) { return inner; } - return new CachingSessionStore( - inner, - storeSupplier.segmentIntervalMs()); + return new CachingSessionStore(inner, storeSupplier.segmentIntervalMs()); } private SessionStore<Bytes, byte[]> maybeWrapLogging(final SessionStore<Bytes, byte[]> inner) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index a13408e..ea3695e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -57,7 +56,6 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.test.IntegrationTest; @@ -154,7 +152,7 @@ public class KStreamAggregationIntegrationTest { public void shouldReduce() throws Exception { produceMessages(mockTime.milliseconds()); groupedStream - .reduce(reducer, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce-by-key")) + .reduce(reducer, Materialized.as("reduce-by-key")) .toStream() .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); @@ -167,7 +165,7 @@ public class KStreamAggregationIntegrationTest { new StringDeserializer(), 10); - Collections.sort(results, KStreamAggregationIntegrationTest::compare); + results.sort(KStreamAggregationIntegrationTest::compare); assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"), KeyValue.pair("A", "A:A"), @@ -226,7 +224,7 @@ public class KStreamAggregationIntegrationTest { comparator = Comparator.comparing((KeyValue<Windowed<String>, String> o) -> o.key.key()).thenComparing(o -> o.value); - Collections.sort(windowedOutput, comparator); + windowedOutput.sort(comparator); final long firstBatchWindow = firstBatchTimestamp / 500 * 500; final long secondBatchWindow = secondBatchTimestamp / 500 * 500; @@ -267,7 +265,7 @@ public class KStreamAggregationIntegrationTest { groupedStream.aggregate( initializer, aggregator, - Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregate-by-selected-key")) + Materialized.as("aggregate-by-selected-key")) .toStream() .to(outputTopic, Produced.with(Serdes.String(), Serdes.Integer())); @@ -280,7 +278,7 @@ public class KStreamAggregationIntegrationTest { new IntegerDeserializer(), 10); - Collections.sort(results, KStreamAggregationIntegrationTest::compare); + results.sort(KStreamAggregationIntegrationTest::compare); assertThat(results, is(Arrays.asList( KeyValue.pair("A", 1), @@ -335,7 +333,7 @@ public class KStreamAggregationIntegrationTest { comparator = Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer, Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key); - Collections.sort(windowedMessages, comparator); + windowedMessages.sort(comparator); final long firstWindow = firstTimestamp / 500 * 500; final long secondWindow = secondTimestamp / 500 * 500; @@ -381,7 +379,7 @@ public class KStreamAggregationIntegrationTest { new StringDeserializer(), new LongDeserializer(), 10); - Collections.sort(results, KStreamAggregationIntegrationTest::compare); + results.sort(KStreamAggregationIntegrationTest::compare); assertThat(results, is(Arrays.asList( KeyValue.pair("A", 1L), @@ -436,7 +434,7 @@ public class KStreamAggregationIntegrationTest { new StringDeserializer(), new LongDeserializer(), 10); - Collections.sort(results, KStreamAggregationIntegrationTest::compare); + results.sort(KStreamAggregationIntegrationTest::compare); final long window = timestamp / 500 * 500; assertThat(results, is(Arrays.asList( @@ -457,13 +455,12 @@ public class KStreamAggregationIntegrationTest { @Test public void shouldCountSessionWindows() throws Exception { final long sessionGap = 5 * 60 * 1000L; - - final long t1 = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"), new KeyValue<>("penny", "start"), new KeyValue<>("jo", "pause"), new KeyValue<>("emily", "pause")); + final long t1 = mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( userSessionsStream, t1Messages, @@ -473,7 +470,6 @@ public class KStreamAggregationIntegrationTest { StringSerializer.class, new Properties()), t1); - final long t2 = t1 + (sessionGap / 2); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( userSessionsStream, @@ -499,7 +495,6 @@ public class KStreamAggregationIntegrationTest { StringSerializer.class, new Properties()), t3); - final long t4 = t3 + (sessionGap / 2); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( userSessionsStream, @@ -513,9 +508,21 @@ public class KStreamAggregationIntegrationTest { StringSerializer.class, new Properties()), t4); + final long t5 = t4 - 1; + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + userSessionsStream, + Collections.singletonList( + new KeyValue<>("jo", "late") // jo has late arrival + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties()), + t5); final Map<Windowed<String>, KeyValue<Long, Long>> results = new HashMap<>(); - final CountDownLatch latch = new CountDownLatch(11); + final CountDownLatch latch = new CountDownLatch(13); builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) @@ -543,10 +550,11 @@ public class KStreamAggregationIntegrationTest { startStreams(); latch.await(30, TimeUnit.SECONDS); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1))); assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1))); assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair(1L, t1))); - assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo(KeyValue.pair(1L, t4))); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t5, t4))), equalTo(KeyValue.pair(2L, t4))); assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(KeyValue.pair(2L, t2))); assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair(2L, t4))); assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair(1L, t3))); @@ -555,13 +563,12 @@ public class KStreamAggregationIntegrationTest { @Test public void shouldReduceSessionWindows() throws Exception { final long sessionGap = 1000L; // something to do with time - - final long t1 = mockTime.milliseconds(); final List<KeyValue<String, String>> t1Messages = Arrays.asList(new KeyValue<>("bob", "start"), new KeyValue<>("penny", "start"), new KeyValue<>("jo", "pause"), new KeyValue<>("emily", "pause")); + final long t1 = mockTime.milliseconds(); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( userSessionsStream, t1Messages, @@ -571,7 +578,6 @@ public class KStreamAggregationIntegrationTest { StringSerializer.class, new Properties()), t1); - final long t2 = t1 + (sessionGap / 2); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( userSessionsStream, @@ -597,7 +603,6 @@ public class KStreamAggregationIntegrationTest { StringSerializer.class, new Properties()), t3); - final long t4 = t3 + (sessionGap / 2); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( userSessionsStream, @@ -611,40 +616,65 @@ public class KStreamAggregationIntegrationTest { StringSerializer.class, new Properties()), t4); + final long t5 = t4 - 1; + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + userSessionsStream, + Collections.singletonList( + new KeyValue<>("jo", "late") // jo has late arrival + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + StringSerializer.class, + StringSerializer.class, + new Properties()), + t5); - final Map<Windowed<String>, String> results = new HashMap<>(); - final CountDownLatch latch = new CountDownLatch(11); + final Map<Windowed<String>, KeyValue<String, Long>> results = new HashMap<>(); + final CountDownLatch latch = new CountDownLatch(13); final String userSessionsStore = "UserSessionsStore"; builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String())) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .windowedBy(SessionWindows.with(ofMillis(sessionGap))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore)) .toStream() - .foreach((key, value) -> { - results.put(key, value); + .transform(() -> new Transformer<Windowed<String>, String, KeyValue<Object, Object>>() { + private ProcessorContext context; + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + @Override + public KeyValue<Object, Object> transform(final Windowed<String> key, final String value) { + results.put(key, KeyValue.pair(value, context.timestamp())); latch.countDown(); - }); + return null; + } + + @Override + public void close() {} + }); startStreams(); latch.await(30, TimeUnit.SECONDS); - final ReadOnlySessionStore<String, String> sessionStore = - kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore()); // verify correct data received - assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo("start")); - assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo("start")); - assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo("pause")); - assertThat(results.get(new Windowed<>("jo", new SessionWindow(t4, t4))), equalTo("resume")); - assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo("pause:resume")); - assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo("pause:resume")); - assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo("stop")); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("start", t1))); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("start", t1))); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t1, t1))), equalTo(KeyValue.pair("pause", t1))); + assertThat(results.get(new Windowed<>("jo", new SessionWindow(t5, t4))), equalTo(KeyValue.pair("resume:late", t4))); + assertThat(results.get(new Windowed<>("emily", new SessionWindow(t1, t2))), equalTo(KeyValue.pair("pause:resume", t2))); + assertThat(results.get(new Windowed<>("bob", new SessionWindow(t3, t4))), equalTo(KeyValue.pair("pause:resume", t4))); + assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(KeyValue.pair("stop", t3))); // verify can query data via IQ + final ReadOnlySessionStore<String, String> sessionStore = + kafkaStreams.store(userSessionsStore, QueryableStoreTypes.sessionStore()); final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob"); assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start"))); assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume"))); assertFalse(bob.hasNext()); - } @Test @@ -727,6 +757,7 @@ public class KStreamAggregationIntegrationTest { }); startStreams(); assertTrue(latch.await(30, TimeUnit.SECONDS)); + assertThat(results.get(new Windowed<>("bob", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(2L, t4))); assertThat(results.get(new Windowed<>("penny", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(1L, t3))); assertThat(results.get(new Windowed<>("jo", new UnlimitedWindow(startTime))), equalTo(KeyValue.pair(1L, t4))); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index acdbb39..97d1566 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -38,9 +38,11 @@ import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; @@ -153,23 +155,31 @@ public class KGroupedStreamImplTest { .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME)); } - private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) { + private void doAggregateSessionWindows(final MockProcessorSupplier<Windowed<String>, Integer> supplier) { try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10)); driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15)); driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30)); driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70)); - driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90)); driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90)); } - assertEquals(Integer.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30)))); - assertEquals(Integer.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15)))); - assertEquals(Integer.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100)))); + final Map<Windowed<String>, ValueAndTimestamp<Integer>> result + = supplier.theCapturedProcessor().lastValueAndTimestampPerKey; + assertEquals( + ValueAndTimestamp.make(2, 30L), + result.get(new Windowed<>("1", new SessionWindow(10L, 30L)))); + assertEquals( + ValueAndTimestamp.make(1, 15L), + result.get(new Windowed<>("2", new SessionWindow(15L, 15L)))); + assertEquals( + ValueAndTimestamp.make(3, 100L), + result.get(new Windowed<>("1", new SessionWindow(70L, 100L)))); } @Test public void shouldAggregateSessionWindows() { - final Map<Windowed<String>, Integer> results = new HashMap<>(); + final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>(); final KTable<Windowed<String>, Integer> table = groupedStream .windowedBy(SessionWindows.with(ofMillis(30))) .aggregate( @@ -179,15 +189,15 @@ public class KGroupedStreamImplTest { Materialized .<String, Integer, SessionStore<Bytes, byte[]>>as("session-store"). withValueSerde(Serdes.Integer())); - table.toStream().foreach(results::put); + table.toStream().process(supplier); - doAggregateSessionWindows(results); + doAggregateSessionWindows(supplier); assertEquals(table.queryableStoreName(), "session-store"); } @Test public void shouldAggregateSessionWindowsWithInternalStoreName() { - final Map<Windowed<String>, Integer> results = new HashMap<>(); + final MockProcessorSupplier<Windowed<String>, Integer> supplier = new MockProcessorSupplier<>(); final KTable<Windowed<String>, Integer> table = groupedStream .windowedBy(SessionWindows.with(ofMillis(30))) .aggregate( @@ -195,80 +205,97 @@ public class KGroupedStreamImplTest { (aggKey, value, aggregate) -> aggregate + 1, (aggKey, aggOne, aggTwo) -> aggOne + aggTwo, Materialized.with(null, Serdes.Integer())); - table.toStream().foreach(results::put); + table.toStream().process(supplier); - doAggregateSessionWindows(results); + doAggregateSessionWindows(supplier); } - private void doCountSessionWindows(final Map<Windowed<String>, Long> results) { + private void doCountSessionWindows(final MockProcessorSupplier<Windowed<String>, Long> supplier) { + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 10)); driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 15)); driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 30)); driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 70)); - driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90)); driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 100)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "1", 90)); } - assertEquals(Long.valueOf(2), results.get(new Windowed<>("1", new SessionWindow(10, 30)))); - assertEquals(Long.valueOf(1), results.get(new Windowed<>("2", new SessionWindow(15, 15)))); - assertEquals(Long.valueOf(3), results.get(new Windowed<>("1", new SessionWindow(70, 100)))); + final Map<Windowed<String>, ValueAndTimestamp<Long>> result = + supplier.theCapturedProcessor().lastValueAndTimestampPerKey; + assertEquals( + ValueAndTimestamp.make(2L, 30L), + result.get(new Windowed<>("1", new SessionWindow(10L, 30L)))); + assertEquals( + ValueAndTimestamp.make(1L, 15L), + result.get(new Windowed<>("2", new SessionWindow(15L, 15L)))); + assertEquals( + ValueAndTimestamp.make(3L, 100L), + result.get(new Windowed<>("1", new SessionWindow(70L, 100L)))); } @Test public void shouldCountSessionWindows() { - final Map<Windowed<String>, Long> results = new HashMap<>(); + final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>(); final KTable<Windowed<String>, Long> table = groupedStream .windowedBy(SessionWindows.with(ofMillis(30))) .count(Materialized.as("session-store")); - table.toStream().foreach(results::put); - doCountSessionWindows(results); + table.toStream().process(supplier); + doCountSessionWindows(supplier); assertEquals(table.queryableStoreName(), "session-store"); } @Test public void shouldCountSessionWindowsWithInternalStoreName() { - final Map<Windowed<String>, Long> results = new HashMap<>(); + final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>(); final KTable<Windowed<String>, Long> table = groupedStream .windowedBy(SessionWindows.with(ofMillis(30))) .count(); - table.toStream().foreach(results::put); - doCountSessionWindows(results); + table.toStream().process(supplier); + doCountSessionWindows(supplier); assertNull(table.queryableStoreName()); } - private void doReduceSessionWindows(final Map<Windowed<String>, String> results) { + private void doReduceSessionWindows(final MockProcessorSupplier<Windowed<String>, String> supplier) { try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 10)); driver.pipeInput(recordFactory.create(TOPIC, "2", "Z", 15)); driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 30)); driver.pipeInput(recordFactory.create(TOPIC, "1", "A", 70)); - driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 90)); - driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 100)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "B", 100)); + driver.pipeInput(recordFactory.create(TOPIC, "1", "C", 90)); } - assertEquals("A:B", results.get(new Windowed<>("1", new SessionWindow(10, 30)))); - assertEquals("Z", results.get(new Windowed<>("2", new SessionWindow(15, 15)))); - assertEquals("A:B:C", results.get(new Windowed<>("1", new SessionWindow(70, 100)))); + final Map<Windowed<String>, ValueAndTimestamp<String>> result = + supplier.theCapturedProcessor().lastValueAndTimestampPerKey; + assertEquals( + ValueAndTimestamp.make("A:B", 30L), + result.get(new Windowed<>("1", new SessionWindow(10L, 30L)))); + assertEquals( + ValueAndTimestamp.make("Z", 15L), + result.get(new Windowed<>("2", new SessionWindow(15L, 15L)))); + assertEquals( + ValueAndTimestamp.make("A:B:C", 100L), + result.get(new Windowed<>("1", new SessionWindow(70L, 100L)))); } @Test public void shouldReduceSessionWindows() { - final Map<Windowed<String>, String> results = new HashMap<>(); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); final KTable<Windowed<String>, String> table = groupedStream .windowedBy(SessionWindows.with(ofMillis(30))) .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as("session-store")); - table.toStream().foreach(results::put); - doReduceSessionWindows(results); + table.toStream().process(supplier); + doReduceSessionWindows(supplier); assertEquals(table.queryableStoreName(), "session-store"); } @Test public void shouldReduceSessionWindowsWithInternalStoreName() { - final Map<Windowed<String>, String> results = new HashMap<>(); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); final KTable<Windowed<String>, String> table = groupedStream .windowedBy(SessionWindows.with(ofMillis(30))) .reduce((value1, value2) -> value1 + ":" + value2); - table.toStream().foreach(results::put); - doReduceSessionWindows(results); + table.toStream().process(supplier); + doReduceSessionWindows(supplier); assertNull(table.queryableStoreName()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index f53d9f3..2ea7700 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; @@ -31,6 +32,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.ToInternal; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -67,6 +69,7 @@ public class KStreamSessionWindowAggregateProcessorTest { private static final long GAP_MS = 5 * 60 * 1000L; private static final String STORE_NAME = "session-store"; + private final ToInternal toInternal = new ToInternal(); private final Initializer<Long> initializer = () -> 0L; private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1; private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo; @@ -78,7 +81,7 @@ public class KStreamSessionWindowAggregateProcessorTest { aggregator, sessionMerger); - private final List<KeyValue> results = new ArrayList<>(); + private final List<KeyValueTimestamp> results = new ArrayList<>(); private final Processor<String, String> processor = sessionAggregator.get(); private SessionStore<String, Long> sessionStore; private InternalMockProcessorContext context; @@ -100,7 +103,8 @@ public class KStreamSessionWindowAggregateProcessorTest { ) { @Override public <K, V> void forward(final K key, final V value, final To to) { - results.add(KeyValue.pair(key, value)); + toInternal.update(to); + results.add(new KeyValueTimestamp<>(key, value, toInternal.timestamp())); } }; @@ -195,9 +199,18 @@ public class KStreamSessionWindowAggregateProcessorTest { sessionStore.flush(); assertEquals( Arrays.asList( - KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>(sessionId, new SessionWindow(time, time)), new Change<>(3L, null)) + new KeyValueTimestamp<>( + new Windowed<>(sessionId, new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, GAP_MS + 1)), + new Change<>(2L, null), + GAP_MS + 1), + new KeyValueTimestamp<>( + new Windowed<>(sessionId, new SessionWindow(time, time)), + new Change<>(3L, null), + time) ), results ); @@ -244,13 +257,34 @@ public class KStreamSessionWindowAggregateProcessorTest { assertEquals( Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), new Change<>(2L, null)), - KeyValue.pair(new Windowed<>("c", new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null)) + new KeyValueTimestamp<>( + new Windowed<>("a", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("b", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("c", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)), + new Change<>(2L, null), + GAP_MS / 2), + new KeyValueTimestamp<>( + new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 1)), + new Change<>(1L, null), + GAP_MS + 1), + new KeyValueTimestamp<>( + new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 1 + GAP_MS / 2)), + new Change<>(2L, null), + GAP_MS + 1 + GAP_MS / 2), + new KeyValueTimestamp<>(new Windowed<>( + "c", + new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + GAP_MS / 2)), new Change<>(1L, null), + GAP_MS + 1 + GAP_MS / 2) ), results ); @@ -283,9 +317,18 @@ public class KStreamSessionWindowAggregateProcessorTest { assertEquals( Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("b", new SessionWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("c", new SessionWindow(0, 0)), new Change<>(1L, null)) + new KeyValueTimestamp<>( + new Windowed<>("a", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("b", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("c", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L) ), results ); @@ -302,9 +345,18 @@ public class KStreamSessionWindowAggregateProcessorTest { processor.process("a", "1"); assertEquals( Arrays.asList( - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(1L, null)), - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), new Change<>(null, null)), - KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 5)), new Change<>(2L, null)) + new KeyValueTimestamp<>( + new Windowed<>("a", new SessionWindow(0, 0)), + new Change<>(1L, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("a", new SessionWindow(0, 0)), + new Change<>(null, null), + 0L), + new KeyValueTimestamp<>( + new Windowed<>("a", new SessionWindow(0, 5)), + new Change<>(2L, null), + 5L) ), results ); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java new file mode 100644 index 0000000..b25febf --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListenerTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class SessionCacheFlushListenerTest { + @Test + public void shouldForwardKeyNewValueOldValueAndTimestamp() { + final InternalProcessorContext context = mock(InternalProcessorContext.class); + expect(context.currentNode()).andReturn(null).anyTimes(); + context.setCurrentNode(null); + context.setCurrentNode(null); + context.forward( + new Windowed<>("key", new SessionWindow(21L, 73L)), + new Change<>("newValue", "oldValue"), + To.all().withTimestamp(73L)); + expectLastCall(); + replay(context); + + new SessionCacheFlushListener<>(context).apply( + new Windowed<>("key", new SessionWindow(21L, 73L)), + "newValue", + "oldValue", + 42L); + + verify(context); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java similarity index 67% rename from streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java rename to streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java index f62e826..e99c684 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TupleForwarderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.junit.Test; @@ -27,7 +29,7 @@ import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; -public class TupleForwarderTest { +public class SessionTupleForwarderTest { @Test public void shouldSetFlushListenerOnWrappedStateStore() { @@ -36,13 +38,13 @@ public class TupleForwarderTest { } private void setFlushListener(final boolean sendOldValues) { - final WrappedStateStore<StateStore, Object, Object> store = mock(WrappedStateStore.class); - final ForwardingCacheFlushListener<Object, Object> flushListener = mock(ForwardingCacheFlushListener.class); + final WrappedStateStore<StateStore, Windowed<Object>, Object> store = mock(WrappedStateStore.class); + final SessionCacheFlushListener<Object, Object> flushListener = mock(SessionCacheFlushListener.class); expect(store.setFlushListener(flushListener, sendOldValues)).andReturn(false); replay(store); - new TupleForwarder<>(store, null, flushListener, sendOldValues); + new SessionTupleForwarder<>(store, null, flushListener, sendOldValues); verify(store); } @@ -53,21 +55,27 @@ public class TupleForwarderTest { shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true); } - private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValues) { + private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final boolean sendOldValued) { final WrappedStateStore<StateStore, String, String> store = mock(WrappedStateStore.class); final ProcessorContext context = mock(ProcessorContext.class); - expect(store.setFlushListener(null, sendOldValues)).andReturn(false); - if (sendOldValues) { - context.forward("key", new Change<>("newValue", "oldValue")); + expect(store.setFlushListener(null, sendOldValued)).andReturn(false); + if (sendOldValued) { + context.forward( + new Windowed<>("key", new SessionWindow(21L, 42L)), + new Change<>("value", "oldValue"), + To.all().withTimestamp(42L)); } else { - context.forward("key", new Change<>("newValue", null)); + context.forward( + new Windowed<>("key", new SessionWindow(21L, 42L)), + new Change<>("value", null), + To.all().withTimestamp(42L)); } expectLastCall(); replay(store, context); - new TupleForwarder<>(store, context, null, sendOldValues) - .maybeForward("key", "newValue", "oldValue"); + new SessionTupleForwarder<>(store, context, null, sendOldValued) + .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), "value", "oldValue"); verify(store, context); } @@ -80,8 +88,8 @@ public class TupleForwarderTest { expect(store.setFlushListener(null, false)).andReturn(true); replay(store, context); - new TupleForwarder<>(store, context, null, false) - .maybeForward("key", "newValue", "oldValue"); + new SessionTupleForwarder<>(store, context, null, false) + .maybeForward(new Windowed<>("key", new SessionWindow(21L, 42L)), "value", "oldValue"); verify(store, context); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java index b2c4ec8..d1e5448 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; @@ -32,16 +33,17 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.StreamsTestUtils; import org.junit.Before; import org.junit.Test; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -67,50 +69,93 @@ public class SessionWindowedKStreamImplTest { } @Test - public void shouldCountSessionWindowed() { - final Map<Windowed<String>, Long> results = new HashMap<>(); + public void shouldCountSessionWindowedWithCachingDisabled() { + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + shouldCountSessionWindowed(); + } + + @Test + public void shouldCountSessionWindowedWithCachingEnabled() { + shouldCountSessionWindowed(); + } + + private void shouldCountSessionWindowed() { + final MockProcessorSupplier<Windowed<String>, Long> supplier = new MockProcessorSupplier<>(); stream.count() - .toStream() - .foreach(results::put); + .toStream() + .process(supplier); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { processData(driver); } - assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo(2L)); - assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo(1L)); - assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo(1L)); + + final Map<Windowed<String>, ValueAndTimestamp<Long>> result = + supplier.theCapturedProcessor().lastValueAndTimestampPerKey; + + assertThat(result.size(), equalTo(3)); + assertThat( + result.get(new Windowed<>("1", new SessionWindow(10L, 15L))), + equalTo(ValueAndTimestamp.make(2L, 15L))); + assertThat( + result.get(new Windowed<>("2", new SessionWindow(599L, 600L))), + equalTo(ValueAndTimestamp.make(2L, 600L))); + assertThat( + result.get(new Windowed<>("1", new SessionWindow(600L, 600L))), + equalTo(ValueAndTimestamp.make(1L, 600L))); } @Test public void shouldReduceWindowed() { - final Map<Windowed<String>, String> results = new HashMap<>(); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); stream.reduce(MockReducer.STRING_ADDER) - .toStream() - .foreach(results::put); + .toStream() + .process(supplier); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { processData(driver); } - assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("1+2")); - assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("1")); - assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("3")); + + final Map<Windowed<String>, ValueAndTimestamp<String>> result = + supplier.theCapturedProcessor().lastValueAndTimestampPerKey; + + assertThat(result.size(), equalTo(3)); + assertThat( + result.get(new Windowed<>("1", new SessionWindow(10, 15))), + equalTo(ValueAndTimestamp.make("1+2", 15L))); + assertThat( + result.get(new Windowed<>("2", new SessionWindow(599L, 600))), + equalTo(ValueAndTimestamp.make("1+2", 600L))); + assertThat( + result.get(new Windowed<>("1", new SessionWindow(600, 600))), + equalTo(ValueAndTimestamp.make("3", 600L))); } @Test public void shouldAggregateSessionWindowed() { - final Map<Windowed<String>, String> results = new HashMap<>(); + final MockProcessorSupplier<Windowed<String>, String> supplier = new MockProcessorSupplier<>(); stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, sessionMerger, Materialized.with(Serdes.String(), Serdes.String())) - .toStream() - .foreach(results::put); + .toStream() + .process(supplier); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { processData(driver); } - assertThat(results.get(new Windowed<>("1", new SessionWindow(10, 15))), equalTo("0+0+1+2")); - assertThat(results.get(new Windowed<>("2", new SessionWindow(600, 600))), equalTo("0+1")); - assertThat(results.get(new Windowed<>("1", new SessionWindow(600, 600))), equalTo("0+3")); + + final Map<Windowed<String>, ValueAndTimestamp<String>> result = + supplier.theCapturedProcessor().lastValueAndTimestampPerKey; + + assertThat(result.size(), equalTo(3)); + assertThat( + result.get(new Windowed<>("1", new SessionWindow(10, 15))), + equalTo(ValueAndTimestamp.make("0+0+1+2", 15L))); + assertThat( + result.get(new Windowed<>("2", new SessionWindow(599, 600))), + equalTo(ValueAndTimestamp.make("0+0+1+2", 600L))); + assertThat( + result.get(new Windowed<>("1", new SessionWindow(600, 600))), + equalTo(ValueAndTimestamp.make("0+3", 600L))); } @Test @@ -126,7 +171,7 @@ public class SessionWindowedKStreamImplTest { equalTo(Arrays.asList( KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L), KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L)))); + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), 2L)))); } } @@ -144,7 +189,7 @@ public class SessionWindowedKStreamImplTest { equalTo(Arrays.asList( KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "1+2"), KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "3"), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "1")))); + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "1+2")))); } } @@ -165,7 +210,7 @@ public class SessionWindowedKStreamImplTest { equalTo(Arrays.asList( KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), "0+0+1+2"), KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), "0+3"), - KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), "0+1")))); + KeyValue.pair(new Windowed<>("2", new SessionWindow(599, 600)), "0+0+1+2")))); } } @@ -247,6 +292,6 @@ public class SessionWindowedKStreamImplTest { driver.pipeInput(recordFactory.create(TOPIC, "1", "2", 15)); driver.pipeInput(recordFactory.create(TOPIC, "1", "3", 600)); driver.pipeInput(recordFactory.create(TOPIC, "2", "1", 600)); + driver.pipeInput(recordFactory.create(TOPIC, "2", "2", 599)); } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index 24f222b..a7151cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -103,7 +103,8 @@ public class SuppressScenarioTest { final Topology topology = builder.build(); - final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + final ConsumerRecordFactory<String, String> recordFactory = + new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); @@ -179,7 +180,8 @@ public class SuppressScenarioTest { .toStream() .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); - final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + final ConsumerRecordFactory<String, String> recordFactory = + new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); @@ -248,7 +250,8 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + final ConsumerRecordFactory<String, String> recordFactory = + new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); @@ -311,7 +314,8 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + final ConsumerRecordFactory<String, String> recordFactory = + new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); @@ -370,7 +374,8 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + final ConsumerRecordFactory<String, String> recordFactory = + new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); @@ -420,7 +425,8 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + final ConsumerRecordFactory<String, String> recordFactory = + new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); @@ -475,7 +481,8 @@ public class SuppressScenarioTest { .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); final Topology topology = builder.build(); System.out.println(topology.describe()); - final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + final ConsumerRecordFactory<String, String> recordFactory = + new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { // first window driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); @@ -492,10 +499,10 @@ public class SuppressScenarioTest { drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), asList( new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L), - new KeyValueTimestamp<>("[k1@0/0]", null, 5L), + new KeyValueTimestamp<>("[k1@0/0]", null, 0L), new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L), - new KeyValueTimestamp<>("[k1@0/5]", null, 1L), - new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L), + new KeyValueTimestamp<>("[k1@0/5]", null, 5L), + new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L), new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L), new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L) ) @@ -503,14 +510,15 @@ public class SuppressScenarioTest { verify( drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), asList( - new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L), + new KeyValueTimestamp<>("[k1@0/5]", 3L, 5L), new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L) ) ); } } - private static <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) { + private static <K, V> void verify(final List<ProducerRecord<K, V>> results, + final List<KeyValueTimestamp<K, V>> expectedResults) { if (results.size() != expectedResults.size()) { throw new AssertionError(printRecords(results) + " != " + expectedResults); } @@ -525,7 +533,10 @@ public class SuppressScenarioTest { } } - private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) { + private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, + final String topic, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer) { final List<ProducerRecord<K, V>> result = new LinkedList<>(); for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer); next != null; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 8c7325c..128cdc2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -17,11 +17,13 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.SessionWindowedDeserializer; import org.apache.kafka.streams.kstream.Windowed; @@ -34,20 +36,18 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.TestUtils; import org.junit.After; -import org.junit.Before; import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.Set; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; +import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList; @@ -74,8 +74,7 @@ public class CachingSessionStoreTest { private CachingSessionStore cachingStore; private ThreadCache cache; - @Before - public void setUp() { + public CachingSessionStoreTest() { final SessionKeySchema schema = new SessionKeySchema(); final RocksDBSegmentedBytesStore root = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, schema); @@ -140,7 +139,7 @@ public class CachingSessionStoreTest { @Test public void shouldFetchAllSessionsWithSameRecordKey() { - final List<KeyValue<Windowed<Bytes>, byte[]>> expected = Arrays.asList( + final List<KeyValue<Windowed<Bytes>, byte[]>> expected = asList( KeyValue.pair(new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()), KeyValue.pair(new Windowed<>(keyA, new SessionWindow(10, 10)), "2".getBytes()), KeyValue.pair(new Windowed<>(keyA, new SessionWindow(100, 100)), "3".getBytes()), @@ -247,8 +246,8 @@ public class CachingSessionStoreTest { final Windowed<Bytes> b = new Windowed<>(keyA, new SessionWindow(1, 2)); final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(2, 4)); final Windowed<String> bDeserialized = new Windowed<>("a", new SessionWindow(1, 2)); - final CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> flushListener = - new CachingKeyValueStoreTest.CacheFlushListenerStub<>( + final CacheFlushListenerStub<Windowed<String>, String> flushListener = + new CacheFlushListenerStub<>( new SessionWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); cachingStore.setFlushListener(flushListener, true); @@ -257,7 +256,11 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - Collections.singletonMap(bDeserialized, new Change<>("1", null)), + Collections.singletonList( + new KeyValueTimestamp<>( + bDeserialized, + new Change<>("1", null), + DEFAULT_TIMESTAMP)), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -266,7 +269,11 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - Collections.singletonMap(aDeserialized, new Change<>("1", null)), + Collections.singletonList( + new KeyValueTimestamp<>( + aDeserialized, + new Change<>("1", null), + DEFAULT_TIMESTAMP)), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -275,7 +282,11 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - Collections.singletonMap(aDeserialized, new Change<>("2", "1")), + Collections.singletonList( + new KeyValueTimestamp<>( + aDeserialized, + new Change<>("2", "1"), + DEFAULT_TIMESTAMP)), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -284,7 +295,11 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - Collections.singletonMap(aDeserialized, new Change<>(null, "2")), + Collections.singletonList( + new KeyValueTimestamp<>( + aDeserialized, + new Change<>(null, "2"), + DEFAULT_TIMESTAMP)), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -295,7 +310,7 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - Collections.emptyMap(), + Collections.emptyList(), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -305,8 +320,8 @@ public class CachingSessionStoreTest { public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() { final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(0, 0)); final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(0, 0)); - final CachingKeyValueStoreTest.CacheFlushListenerStub<Windowed<String>, String> flushListener = - new CachingKeyValueStoreTest.CacheFlushListenerStub<>( + final CacheFlushListenerStub<Windowed<String>, String> flushListener = + new CacheFlushListenerStub<>( new SessionWindowedDeserializer<>(new StringDeserializer()), new StringDeserializer()); cachingStore.setFlushListener(flushListener, false); @@ -321,11 +336,18 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - mkMap( - mkEntry(aDeserialized, new Change<>("1", null)), - mkEntry(aDeserialized, new Change<>("2", null)), - mkEntry(aDeserialized, new Change<>(null, null)) - ), + asList(new KeyValueTimestamp<>( + aDeserialized, + new Change<>("1", null), + DEFAULT_TIMESTAMP), + new KeyValueTimestamp<>( + aDeserialized, + new Change<>("2", null), + DEFAULT_TIMESTAMP), + new KeyValueTimestamp<>( + aDeserialized, + new Change<>(null, null), + DEFAULT_TIMESTAMP)), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -336,7 +358,7 @@ public class CachingSessionStoreTest { cachingStore.flush(); assertEquals( - Collections.emptyMap(), + Collections.emptyList(), flushListener.forwarded ); flushListener.forwarded.clear(); @@ -466,4 +488,29 @@ public class CachingSessionStoreTest { allSessions.add(KeyValue.pair(key, value)); } + public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> { + final Deserializer<K> keyDeserializer; + final Deserializer<V> valueDesializer; + final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<>(); + + CacheFlushListenerStub(final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDesializer) { + this.keyDeserializer = keyDeserializer; + this.valueDesializer = valueDesializer; + } + + @Override + public void apply(final byte[] key, + final byte[] newValue, + final byte[] oldValue, + final long timestamp) { + forwarded.add( + new KeyValueTimestamp<>( + keyDeserializer.deserialize(null, key), + new Change<>( + valueDesializer.deserialize(null, newValue), + valueDesializer.deserialize(null, oldValue)), + timestamp)); + } + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java index eb0cc55..3e3c1a7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; -import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; @@ -34,8 +33,13 @@ import org.junit.runner.RunWith; import java.util.Collections; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThrows; @RunWith(EasyMockRunner.class) public class SessionStoreBuilderTest { @@ -49,15 +53,15 @@ public class SessionStoreBuilderTest { @Before public void setUp() throws Exception { - EasyMock.expect(supplier.get()).andReturn(inner); - EasyMock.expect(supplier.name()).andReturn("name"); - EasyMock.replay(supplier); + expect(supplier.get()).andReturn(inner); + expect(supplier.name()).andReturn("name"); + replay(supplier); - builder = new SessionStoreBuilder<>(supplier, - Serdes.String(), - Serdes.String(), - new MockTime() - ); + builder = new SessionStoreBuilder<>( + supplier, + Serdes.String(), + Serdes.String(), + new MockTime()); } @Test @@ -113,29 +117,37 @@ public class SessionStoreBuilderTest { assertThat(changeLogging.wrapped(), CoreMatchers.<StateStore>equalTo(inner)); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfInnerIsNull() { - new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()); + final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("supplier cannot be null")); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfKeySerdeIsNull() { - new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()); + final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, null, Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("name cannot be null")); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfValueSerdeIsNull() { - new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime()); + final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), null, new MockTime())); + assertThat(e.getMessage(), equalTo("name cannot be null")); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfTimeIsNull() { - new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); + reset(supplier); + expect(supplier.name()).andReturn("name"); + replay(supplier); + final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null)); + assertThat(e.getMessage(), equalTo("time cannot be null")); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowNullPointerIfMetricsScopeIsNull() { - new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime()); + final Exception e = assertThrows(NullPointerException.class, () -> new SessionStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), new MockTime())); + assertThat(e.getMessage(), equalTo("name cannot be null")); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java index e000b82..0ff5c8a 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java @@ -20,9 +20,12 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.state.ValueAndTimestamp; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; @@ -32,6 +35,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> { public final ArrayList<String> processed = new ArrayList<>(); public final ArrayList<K> processedKeys = new ArrayList<>(); public final ArrayList<V> processedValues = new ArrayList<>(); + public final Map<K, ValueAndTimestamp<V>> lastValueAndTimestampPerKey = new HashMap<>(); public final ArrayList<Long> punctuatedStreamTime = new ArrayList<>(); public final ArrayList<Long> punctuatedSystemTime = new ArrayList<>(); @@ -77,6 +81,11 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> { public void process(final K key, final V value) { processedKeys.add(key); processedValues.add(value); + if (value != null) { + lastValueAndTimestampPerKey.put(key, ValueAndTimestamp.make(value, context().timestamp())); + } else { + lastValueAndTimestampPerKey.remove(key); + } processed.add( (key == null ? "null" : key) + ":" + (value == null ? "null" : value) + diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala index f2de3de..c0110a1 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala @@ -289,22 +289,34 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None } { - // late event for first window, included since grade period hasn't passed + // out-of-order event for first window, included since grade period hasn't passed testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 2L) Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None } { + // add to second window + testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 13L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { + // add out-of-order to second window + testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 10L) + Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None + } + { // push stream time forward to flush other events through testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 30L) - // too-late event should get dropped from the stream + // late event should get dropped from the stream testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 3L) // should now have to results val r1 = testDriver.readRecord[String, Long](sinkTopic) r1.key shouldBe "0:2:k1" r1.value shouldBe 3L + r1.timestamp shouldBe 2L val r2 = testDriver.readRecord[String, Long](sinkTopic) - r2.key shouldBe "8:8:k1" - r2.value shouldBe 1 + r2.key shouldBe "8:13:k1" + r2.value shouldBe 3L + r2.timestamp shouldBe 13L } testDriver.readRecord[String, Long](sinkTopic) shouldBe null