This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b5d4fa7645e KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370) b5d4fa7645e is described below commit b5d4fa7645eb75d2030eb8cac78545a681686a39 Author: Hao Li <1127478+lihao...@users.noreply.github.com> AuthorDate: Tue Jul 12 10:57:11 2022 -0700 KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window (#12370) 1. Added more unit test for RocksDBTimeOrderedSessionStore and RocksDBTimeOrderedSessionSegmentedBytesStore 2. Disable cache for sliding window if emit strategy is ON_WINDOW_CLOSE Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../internals/SessionWindowedKStreamImpl.java | 3 + .../internals/SlidingWindowedKStreamImpl.java | 4 +- .../state/internals/PrefixedSessionKeySchemas.java | 14 +-- ...cksDBTimeOrderedSessionSegmentedBytesStore.java | 8 +- .../internals/RocksDBTimeOrderedSessionStore.java | 8 +- ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 138 +++++++++++++++++++++ .../internals/AbstractSessionBytesStoreTest.java | 124 ++++++++++++++++++ .../state/internals/InMemorySessionStoreTest.java | 41 +----- .../state/internals/RocksDBSessionStoreTest.java | 57 +-------- 9 files changed, 291 insertions(+), 106 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index c3b05cb1182..8c60019fccb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -289,7 +289,10 @@ public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple // do not enable cache if the emit final strategy is used if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); + } else { + builder.withCachingDisabled(); } + return builder; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java index 70c75b4c82e..5ca6b911b7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java @@ -258,7 +258,9 @@ public class SlidingWindowedKStreamImpl<K, V> extends AbstractStream<K, V> imple } else { builder.withLoggingDisabled(); } - if (materialized.cachingEnabled()) { + + // do not enable cache if the emit final strategy is used + if (materialized.cachingEnabled() && emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); } else { builder.withCachingDisabled(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java index 2ac25277ba8..3ce00bcb8a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java @@ -102,8 +102,8 @@ public class PrefixedSessionKeySchemas { @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, - final long from, - final long to, + final long earliestWindowEndTime, + final long latestWindowStartTime, final boolean forward) { return iterator -> { while (iterator.hasNext()) { @@ -120,13 +120,13 @@ public class PrefixedSessionKeySchemas { // We can return false directly here since keys are sorted by end time and if // we get time smaller than `from`, there won't be time within range. - if (!forward && endTime < from) { + if (!forward && endTime < earliestWindowEndTime) { return false; } if ((binaryKeyFrom == null || windowedKey.key().compareTo(binaryKeyFrom) >= 0) && (binaryKeyTo == null || windowedKey.key().compareTo(binaryKeyTo) <= 0) - && endTime >= from && startTime <= to) { + && endTime >= earliestWindowEndTime && startTime <= latestWindowStartTime) { return true; } iterator.next(); @@ -137,10 +137,10 @@ public class PrefixedSessionKeySchemas { @Override public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, - final long from, - final long to, + final long earliestWindowEndTime, + final long latestWindowStartTime, final boolean forward) { - return segments.segments(from, Long.MAX_VALUE, forward); + return segments.segments(earliestWindowEndTime, Long.MAX_VALUE, forward); } static long extractStartTimestamp(final byte[] binaryKey) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java index 172d3218818..59e255443c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java @@ -63,12 +63,12 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore extends AbstractRocksD } public byte[] fetchSession(final Bytes key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { + final long sessionStartTime, + final long sessionEndTime) { return get(TimeFirstSessionKeySchema.toBinary( key, - earliestSessionEndTime, - latestSessionStartTime + sessionStartTime, + sessionEndTime )); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java index deb6028ef68..62a874f06c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java @@ -122,12 +122,12 @@ public class RocksDBTimeOrderedSessionStore @Override public byte[] fetchSession(final Bytes key, - final long earliestSessionEndTime, - final long latestSessionStartTime) { + final long sessionStartTime, + final long sessiontEndTime) { return wrapped().fetchSession( key, - earliestSessionEndTime, - latestSessionStartTime + sessionStartTime, + sessiontEndTime ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java index 0641392b2a3..3644e8eaa6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java @@ -91,6 +91,7 @@ import static org.hamcrest.Matchers.hasEntry; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -827,6 +828,143 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends } } + @Test + public void shouldFetchSessionForSingleKey() { + // Only for TimeFirstSessionKeySchema schema + if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) { + return; + } + + final String keyA = "a"; + final String keyB = "b"; + final String keyC = "c"; + + final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class); + final Bytes key1 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyA)); + final Bytes key2 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyB)); + final Bytes key3 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyC)); + + final byte[] expectedValue1 = serializeValue(10); + final byte[] expectedValue2 = serializeValue(50); + final byte[] expectedValue3 = serializeValue(100); + final byte[] expectedValue4 = serializeValue(200); + + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), expectedValue1); + bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), expectedValue2); + bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), expectedValue3); + bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4); + + final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( + key1, windows[0].start(), windows[0].end()); + assertEquals(Bytes.wrap(value1), Bytes.wrap(expectedValue1)); + + final byte[] value2 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( + key1, windows[1].start(), windows[1].end()); + assertEquals(Bytes.wrap(value2), Bytes.wrap(expectedValue2)); + + final byte[] value3 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( + key2, windows[2].start(), windows[2].end()); + assertEquals(Bytes.wrap(value3), Bytes.wrap(expectedValue3)); + + final byte[] value4 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( + key3, windows[3].start(), windows[3].end()); + assertEquals(Bytes.wrap(value4), Bytes.wrap(expectedValue4)); + + final byte[] noValue = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( + key3, 2000, 3000); + assertNull(noValue); + } + + @Test + public void shouldFetchSessionForTimeRange() { + // Only for TimeFirstSessionKeySchema schema + if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) { + return; + } + final String keyA = "a"; + final String keyB = "b"; + final String keyC = "c"; + + final Window[] sessionWindows = new Window[4]; + sessionWindows[0] = new SessionWindow(100L, 100L); + sessionWindows[1] = new SessionWindow(50L, 200L); + sessionWindows[2] = new SessionWindow(200L, 300L); + bytesStore.put(serializeKey(new Windowed<>(keyA, sessionWindows[0])), serializeValue(10)); + bytesStore.put(serializeKey(new Windowed<>(keyB, sessionWindows[1])), serializeValue(100)); + bytesStore.put(serializeKey(new Windowed<>(keyC, sessionWindows[2])), serializeValue(200)); + + + // Fetch point + try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 100L)) { + + final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L) + ); + + assertEquals(expected, toList(values)); + } + + // Fetch partial boundary + try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 200L)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L), + KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L) + ); + + assertEquals(expected, toList(values)); + } + + // Fetch partial + try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(99L, 201L)) { + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L), + KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L) + ); + + assertEquals(expected, toList(values)); + } + + // Fetch partial + try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(101L, 199L)) { + assertTrue(toList(values).isEmpty()); + } + + // Fetch all boundary + try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(100L, 300L)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L), + KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L), + KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L) + ); + + assertEquals(expected, toList(values)); + } + + // Fetch all + try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(99L, 301L)) { + + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>(keyA, sessionWindows[0]), 10L), + KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L), + KeyValue.pair(new Windowed<>(keyC, sessionWindows[2]), 200L) + ); + + assertEquals(expected, toList(values)); + } + + // Fetch all + try (final KeyValueIterator<Bytes, byte[]> values = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSessions(101L, 299L)) { + + final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>(keyB, sessionWindows[1]), 100L) + ); + + assertEquals(expected, toList(values)); + } + } + @Test public void shouldSkipAndRemoveDanglingIndex() { final String keyA = "a"; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java index 6e93f6a7ba1..78d7f08ee84 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -32,7 +33,9 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.test.InternalMockProcessorContext; @@ -60,6 +63,7 @@ import static org.apache.kafka.test.StreamsTestUtils.valuesToSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -73,6 +77,13 @@ public abstract class AbstractSessionBytesStoreTest { static final long SEGMENT_INTERVAL = 60_000L; static final long RETENTION_PERIOD = 10_000L; + enum StoreType { + RocksDBSessionStore, + RocksDBTimeOrderedSessionStoreWithIndex, + RocksDBTimeOrderedSessionStoreWithoutIndex, + InMemoryStore + } + SessionStore<String, Long> sessionStore; private MockRecordCollector recordCollector; @@ -83,6 +94,8 @@ public abstract class AbstractSessionBytesStoreTest { final Serde<K> keySerde, final Serde<V> valueSerde); + abstract StoreType getStoreType(); + @Before public void setUp() { sessionStore = buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long()); @@ -179,6 +192,75 @@ public abstract class AbstractSessionBytesStoreTest { } } + @Test + public void shouldFindSessionsForTimeRange() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 5L); + + if (getStoreType() == StoreType.RocksDBSessionStore) { + assertThrows( + "This API is not supported by this implementation of SessionStore.", + UnsupportedOperationException.class, + () -> sessionStore.findSessions(0, 0) + ); + return; + } + + // Find point + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(0, 0)) { + final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L) + ); + assertEquals(expected, toList(values)); + } + + sessionStore.put(new Windowed<>("b", new SessionWindow(10, 20)), 10L); + sessionStore.put(new Windowed<>("c", new SessionWindow(30, 40)), 20L); + + // Find boundary + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(0, 20)) { + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 10L) + ); + assertEquals(expected, toList(values)); + } + + // Find left boundary + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(0, 19)) { + final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L) + ); + assertEquals(expected, toList(values)); + } + + // Find right boundary + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(1, 20)) { + final List<KeyValue<Windowed<String>, Long>> expected = Collections.singletonList( + KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 10L) + ); + assertEquals(expected, toList(values)); + } + + // Find partial off by 1 + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(19, 41)) { + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 10L), + KeyValue.pair(new Windowed<>("c", new SessionWindow(30, 40)), 20L) + ); + assertEquals(expected, toList(values)); + } + + // Find all boundary + try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(0, 40)) { + final List<KeyValue<Windowed<String>, Long>> expected = asList( + KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 5L), + KeyValue.pair(new Windowed<>("b", new SessionWindow(10, 20)), 10L), + KeyValue.pair(new Windowed<>("c", new SessionWindow(30, 40)), 20L) + ); + assertEquals(expected, toList(values)); + } + } + @Test public void shouldBackwardFetchAllSessionsWithSameRecordKey() { final LinkedList<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>(); @@ -810,4 +892,46 @@ public abstract class AbstractSessionBytesStoreTest { ); } } + + @Test + public void shouldRemoveExpired() { + sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); + if (getStoreType() == StoreType.InMemoryStore) { + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); + + // Advance stream time to expire the first record + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, RETENTION_PERIOD)), 4L); + } else { + sessionStore.put(new Windowed<>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L); + sessionStore.put(new Windowed<>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L); + + // Advance stream time to expire the first record + sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 2 * SEGMENT_INTERVAL)), 4L); + } + + try (final KeyValueIterator<Windowed<String>, Long> iterator = + sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE) + ) { + assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L))); + } + } + + @Test + public void shouldMatchPositionAfterPut() { + final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore; + final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped(); + final SessionStore wrapped = (SessionStore) changeLoggingSessionBytesStore.wrapped(); + + context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders())); + sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L); + context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders())); + sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, 10)), 2L); + context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders())); + sessionStore.put(new Windowed<String>("a", new SessionWindow(10, 20)), 3L); + + final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L))))); + final Position actual = sessionStore.getPosition(); + assertThat(expected, is(actual)); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java index 7821e2c0216..8546c546716 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java @@ -16,12 +16,9 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; -import org.apache.kafka.streams.query.Position; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.Stores; @@ -31,13 +28,9 @@ import java.util.Arrays; import java.util.HashSet; import static java.time.Duration.ofMillis; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.test.StreamsTestUtils.valuesToSet; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.hamcrest.Matchers.is; public class InMemorySessionStoreTest extends AbstractSessionBytesStoreTest { @@ -55,20 +48,8 @@ public class InMemorySessionStoreTest extends AbstractSessionBytesStoreTest { valueSerde).build(); } - @Test - public void shouldRemoveExpired() { - sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); - sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); - sessionStore.put(new Windowed<>("a", new SessionWindow(10, 20)), 3L); - - // Advance stream time to expire the first record - sessionStore.put(new Windowed<>("aa", new SessionWindow(10, RETENTION_PERIOD)), 4L); - - try (final KeyValueIterator<Windowed<String>, Long> iterator = - sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE) - ) { - assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L))); - } + StoreType getStoreType() { + return StoreType.InMemoryStore; } @Test @@ -90,22 +71,4 @@ public class InMemorySessionStoreTest extends AbstractSessionBytesStoreTest { assertFalse(sessionStore.findSessions("a", "b", 0L, 20L).hasNext()); } - @Test - public void shouldMatchPositionAfterPut() { - final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore; - final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped(); - final InMemorySessionStore inMemorySessionStore = (InMemorySessionStore) changeLoggingSessionBytesStore.wrapped(); - - context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders())); - sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L); - context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders())); - sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, 10)), 2L); - context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders())); - sessionStore.put(new Windowed<String>("a", new SessionWindow(10, 20)), 3L); - - final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L))))); - final Position actual = inMemorySessionStore.getPosition(); - assertThat(expected, is(actual)); - } - } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index b3a749a8a37..8a849d86bcb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -17,44 +17,27 @@ package org.apache.kafka.streams.state.internals; import java.util.Collection; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.kstream.internals.SessionWindow; -import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.query.Position; -import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.Stores; -import org.junit.Test; -import java.util.HashSet; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.test.StreamsTestUtils.valuesToSet; -import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest { private static final String STORE_NAME = "rocksDB session store"; - enum StoreType { - RocksDBSessionStore, - RocksDBTimeOrderedSessionStoreWithIndex, - RocksDBTimeOrderedSessionStoreWithoutIndex - } @Parameter public StoreType storeType; @Parameterized.Parameters(name = "{0}") - public static Collection<Object[]> getKeySchema() { + public static Collection<Object[]> getParamStoreType() { return asList(new Object[][] { {StoreType.RocksDBSessionStore}, {StoreType.RocksDBTimeOrderedSessionStoreWithIndex}, @@ -62,6 +45,11 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest { }); } + @Override + StoreType getStoreType() { + return storeType; + } + @Override <K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod, final Serde<K> keySerde, @@ -102,37 +90,4 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest { } } - @Test - public void shouldRemoveExpired() { - sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); - sessionStore.put(new Windowed<>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L); - sessionStore.put(new Windowed<>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L); - - // Advance stream time to expire the first record - sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 2 * SEGMENT_INTERVAL)), 4L); - - try (final KeyValueIterator<Windowed<String>, Long> iterator = - sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE) - ) { - assertEquals(valuesToSet(iterator), new HashSet<>(asList(2L, 3L, 4L))); - } - } - - @Test - public void shouldMatchPositionAfterPut() { - final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore; - final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped(); - final WrappedStateStore rocksDBSessionStore = (WrappedStateStore) changeLoggingSessionBytesStore.wrapped(); - - context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders())); - sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L); - context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders())); - sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L); - context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders())); - sessionStore.put(new Windowed<String>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L); - - final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L))))); - final Position actual = rocksDBSessionStore.getPosition(); - assertEquals(expected, actual); - } } \ No newline at end of file