This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 64fff8b KAFKA-7080: replace numSegments with segmentInterval (#5257) 64fff8b is described below commit 64fff8bfcc9b92769640bfaa692e19d0db8861a6 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Mon Jul 2 18:07:38 2018 -0500 KAFKA-7080: replace numSegments with segmentInterval (#5257) See also KIP-319. Replace number-of-segments parameters with segment-interval-ms parameters in various places. The latter was always the parameter that several components needed, and we accidentally supplied the former because it was the one available. Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../kafka/streams/kstream/SessionWindows.java | 12 ++--- .../org/apache/kafka/streams/kstream/Windows.java | 29 ++++++---- .../streams/kstream/internals/KStreamImpl.java | 17 +++--- .../kstream/internals/TimeWindowedKStreamImpl.java | 12 +++-- .../org/apache/kafka/streams/state/Stores.java | 61 +++++++++++++++++++--- .../streams/state/WindowBytesStoreSupplier.java | 13 ++++- .../RocksDbSessionBytesStoreSupplier.java | 1 + .../internals/RocksDbWindowBytesStoreSupplier.java | 6 +++ .../state/internals/WindowStoreBuilder.java | 2 +- .../kafka/streams/kstream/JoinWindowsTest.java | 10 ---- .../kafka/streams/kstream/SessionWindowsTest.java | 2 +- .../kafka/streams/kstream/TimeWindowsTest.java | 2 +- .../apache/kafka/streams/kstream/WindowsTest.java | 8 ++- .../apache/kafka/streams/perf/SimpleBenchmark.java | 17 +++--- .../internals/InternalTopologyBuilderTest.java | 10 +++- .../org/apache/kafka/streams/state/StoresTest.java | 29 +++++----- .../internals/RocksDBSegmentedBytesStoreTest.java | 4 +- .../state/internals/RocksDBSessionStoreTest.java | 2 +- .../state/internals/RocksDBWindowStoreTest.java | 60 ++++++++------------- .../StreamThreadStateStoreProviderTest.java | 8 ++- 20 files changed, 190 insertions(+), 115 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java index aa3dec1..fc1fb9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java @@ -66,11 +66,11 @@ import java.util.Objects; public final class SessionWindows { private final long gapMs; - private long maintainDurationMs; + private final long maintainDurationMs; - private SessionWindows(final long gapMs) { + private SessionWindows(final long gapMs, final long maintainDurationMs) { this.gapMs = gapMs; - maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS; + this.maintainDurationMs = maintainDurationMs; } /** @@ -85,7 +85,8 @@ public final class SessionWindows { if (inactivityGapMs <= 0) { throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative."); } - return new SessionWindows(inactivityGapMs); + final long oneDayMs = 24 * 60 * 60_000L; + return new SessionWindows(inactivityGapMs, oneDayMs); } /** @@ -99,9 +100,8 @@ public final class SessionWindows { if (durationMs < gapMs) { throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap."); } - maintainDurationMs = durationMs; - return this; + return new SessionWindows(gapMs, durationMs); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index 09fdfce..53ead1e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -36,18 +36,10 @@ import java.util.Map; */ public abstract class Windows<W extends Window> { - private static final int DEFAULT_NUM_SEGMENTS = 3; + private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day + @Deprecated public int segments = 3; - static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // one day - - private long maintainDurationMs; - - public int segments; - - protected Windows() { - segments = DEFAULT_NUM_SEGMENTS; - maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS; - } + protected Windows() {} /** * Set the window maintain duration (retention time) in milliseconds. @@ -77,13 +69,28 @@ public abstract class Windows<W extends Window> { } /** + * Return the segment interval in milliseconds. + * + * @return the segment interval + */ + @SuppressWarnings("deprecation") // The deprecation is on the public visibility of segments. We intend to make the field private later. + public long segmentInterval() { + // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient. + final long minimumSegmentInterval = 60_000L; + // Scaled to the (possibly overridden) retention period + return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval); + } + + /** * Set the number of segments to be used for rolling the window store. * This function is not exposed to users but can be called by developers that extend this class. * * @param segments the number of segments to be used * @return itself * @throws IllegalArgumentException if specified segments is small than 2 + * @deprecated since 2.1 Override segmentInterval() instead. */ + @Deprecated protected Windows<W> segments(final int segments) throws IllegalArgumentException { if (segments < 2) { throw new IllegalArgumentException("Number of segments must be at least 2."); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index bc56a3d..acfdf35 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -844,12 +844,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final Serde<K> keySerde, final Serde<V> valueSerde, final String storeName) { - return Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName, - windows.maintainMs(), - windows.segments, - windows.size(), - true), keySerde, valueSerde); - + return Stores.windowStoreBuilder( + Stores.persistentWindowStore( + storeName, + windows.maintainMs(), + windows.size(), + true, + windows.segmentInterval() + ), + keySerde, + valueSerde + ); } private class KStreamImplJoin { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index e545f48..7d6d174 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -155,11 +155,13 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) { WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier(); if (supplier == null) { - supplier = Stores.persistentWindowStore(materialized.storeName(), - windows.maintainMs(), - windows.segments, - windows.size(), - false); + supplier = Stores.persistentWindowStore( + materialized.storeName(), + windows.maintainMs(), + windows.size(), + false, + windows.segmentInterval() + ); } final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier, materialized.keySerde(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index eebd59f..c1b81c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -79,7 +79,7 @@ public class Stores { /** * Create a persistent {@link KeyValueBytesStoreSupplier}. * @param name name of the store (cannot be {@code null}) - * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used + * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used * to build a persistent store */ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) { @@ -90,7 +90,7 @@ public class Stores { /** * Create an in-memory {@link KeyValueBytesStoreSupplier}. * @param name name of the store (cannot be {@code null}) - * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to + * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to * build an in-memory store */ public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) { @@ -151,25 +151,72 @@ public class Stores { * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. * @return an instance of {@link WindowBytesStoreSupplier} + * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead */ + @Deprecated public static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriod, final int numSegments, final long windowSize, final boolean retainDuplicates) { + if (numSegments < 2) { + throw new IllegalArgumentException("numSegments cannot must smaller than 2"); + } + + final long legacySegmentInterval = Math.max(retentionPeriod / (numSegments - 1), 60_000L); + + return persistentWindowStore( + name, + retentionPeriod, + windowSize, + retainDuplicates, + legacySegmentInterval + ); + } + + /** + * Create a persistent {@link WindowBytesStoreSupplier}. + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * @param windowSize size of the windows (cannot be negative) + * @param retainDuplicates whether or not to retain duplicates. + * @return an instance of {@link WindowBytesStoreSupplier} + */ + public static WindowBytesStoreSupplier persistentWindowStore(final String name, + final long retentionPeriod, + final long windowSize, + final boolean retainDuplicates) { + // we're arbitrarily defaulting to segments no smaller than one minute. + final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); + return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval); + } + + /** + * Create a persistent {@link WindowBytesStoreSupplier}. + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * @param segmentInterval size of segments in ms (must be at least one minute) + * @param windowSize size of the windows (cannot be negative) + * @param retainDuplicates whether or not to retain duplicates. + * @return an instance of {@link WindowBytesStoreSupplier} + */ + public static WindowBytesStoreSupplier persistentWindowStore(final String name, + final long retentionPeriod, + final long windowSize, + final boolean retainDuplicates, + final long segmentInterval) { Objects.requireNonNull(name, "name cannot be null"); if (retentionPeriod < 0) { throw new IllegalArgumentException("retentionPeriod cannot be negative"); } - if (numSegments < 2) { - throw new IllegalArgumentException("numSegments cannot must smaller than 2"); - } if (windowSize < 0) { throw new IllegalArgumentException("windowSize cannot be negative"); } - final long segmentIntervalMs = Math.max(retentionPeriod / (numSegments - 1), 60_000L); + if (segmentInterval < 60_000) { + throw new IllegalArgumentException("segmentInterval must be at least one minute"); + } - return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentIntervalMs, windowSize, retainDuplicates); + return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java index 9cf70c2..c071b34 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java @@ -33,11 +33,22 @@ public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Byte * It is also used to reduce the amount of data that is scanned when caching is enabled. * * @return number of segments + * @deprecated since 2.1. Use {@link WindowBytesStoreSupplier#segmentIntervalMs()} instead. */ + @Deprecated int segments(); /** - * The size of the windows any store created from this supplier is creating. + * The size of the segments (in milliseconds) the store has. + * If your store is segmented then this should be the size of segments in the underlying store. + * It is also used to reduce the amount of data that is scanned when caching is enabled. + * + * @return size of the segments (in milliseconds) + */ + long segmentIntervalMs(); + + /** + * The size of the windows (in milliseconds) any store created from this supplier is creating. * * @return window size */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java index 37968ce..45df39c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java @@ -53,6 +53,7 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli @Override public long segmentIntervalMs() { + // Selected somewhat arbitrarily. Profiling may reveal a different value is preferable. return Math.max(retentionPeriod / 2, 60_000L); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java index 9421bcb..5c7b099 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java @@ -66,12 +66,18 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier return "rocksdb-window-state"; } + @Deprecated @Override public int segments() { return (int) (retentionPeriod / segmentInterval) + 1; } @Override + public long segmentIntervalMs() { + return segmentInterval; + } + + @Override public long windowSize() { return windowSize; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java index 97b4883..31d063a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java @@ -52,7 +52,7 @@ public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowS keySerde, valueSerde, storeSupplier.windowSize(), - storeSupplier.segments()); + storeSupplier.segmentIntervalMs()); } private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java index 0611704..7b22df1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java @@ -113,16 +113,6 @@ public class JoinWindowsTest { } @Test - public void shouldUseWindowSizeForMaintainDurationWhenSizeLargerThanDefaultMaintainMs() { - final long size = Windows.DEFAULT_MAINTAIN_DURATION_MS; - - final JoinWindows windowSpec = JoinWindows.of(size); - final long windowSize = windowSpec.size(); - - assertEquals(windowSize, windowSpec.maintainMs()); - } - - @Test public void retentionTimeMustNoBeSmallerThanWindowSize() { final JoinWindows windowSpec = JoinWindows.of(anySize); final long windowSize = windowSpec.size(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java index 8c0a0b9..d0e5996 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java @@ -50,7 +50,7 @@ public class SessionWindowsTest { @Test public void retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() { - final long windowGap = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS; + final long windowGap = 2 * SessionWindows.with(1).maintainMs(); assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java index 7e6bb3e..390678f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java @@ -47,7 +47,7 @@ public class TimeWindowsTest { @Test public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() { - final long windowSize = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS; + final long windowSize = 2 * TimeWindows.of(1).maintainMs(); assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java index 77faf1a..2e9246e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java @@ -40,7 +40,13 @@ public class WindowsTest { @Test public void shouldSetNumberOfSegments() { final int anySegmentSizeLargerThanOne = 5; - assertEquals(anySegmentSizeLargerThanOne, new TestWindows().segments(anySegmentSizeLargerThanOne).segments); + final TestWindows testWindow = new TestWindows(); + final long maintainMs = testWindow.maintainMs(); + + assertEquals( + maintainMs / (anySegmentSizeLargerThanOne - 1), + testWindow.segments(anySegmentSizeLargerThanOne).segmentInterval() + ); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 7179293..654fd03 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -32,10 +32,10 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; @@ -469,13 +469,18 @@ public class SimpleBenchmark { setStreamProperties("simple-benchmark-streams-with-store"); final StreamsBuilder builder = new StreamsBuilder(); - final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder - = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", + + final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder( + Stores.persistentWindowStore( + "store", AGGREGATE_WINDOW_SIZE * 3, - 3, AGGREGATE_WINDOW_SIZE, - false), - INTEGER_SERDE, BYTE_SERDE); + false, + 60_000L + ), + INTEGER_SERDE, + BYTE_SERDE + ); builder.addStateStore(storeBuilder.withCachingEnabled()); final KStream<Integer, byte[]> source = builder.stream(topic); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index b0674ea..fb64130 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -532,7 +532,15 @@ public class InternalTopologyBuilderTest { builder.setApplicationId("appId"); builder.addSource(null, "source", null, null, null, "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor"); + + builder.addStateStore( + Stores.windowStoreBuilder( + Stores.persistentWindowStore("store", 30_000L, 10_000L, false), + Serdes.String(), + Serdes.String() + ), + "processor" + ); final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java index 5383c27..23f246d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java @@ -28,7 +28,6 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsNot.not; -import static org.junit.Assert.fail; public class StoresTest { @@ -54,22 +53,28 @@ public class StoresTest { @Test(expected = NullPointerException.class) public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() { - Stores.persistentWindowStore(null, 0, 1, 0, false); + Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L); } @Test(expected = IllegalArgumentException.class) public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() { - Stores.persistentWindowStore("anyName", -1, 1, 0, false); + Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L); } + @Deprecated @Test(expected = IllegalArgumentException.class) public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() { - Stores.persistentWindowStore("anyName", 0, 0, 0, false); + Stores.persistentWindowStore("anyName", 0L, 1, 0L, false); } @Test(expected = IllegalArgumentException.class) public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() { - Stores.persistentWindowStore("anyName", 0, 1, -1, false); + Stores.persistentWindowStore("anyName", 0L, -1L, false); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() { + Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L); } @Test(expected = NullPointerException.class) @@ -98,16 +103,6 @@ public class StoresTest { } @Test - public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() { - try { - Stores.persistentWindowStore("store", 1, 1, 1, false); - fail("Should have thrown illegal argument exception as number of segments is less than 2"); - } catch (final IllegalArgumentException e) { - // ok - } - } - - @Test public void shouldCreateInMemoryKeyValueStore() { assertThat(Stores.inMemoryKeyValueStore("memory").get(), instanceOf(InMemoryKeyValueStore.class)); } @@ -124,7 +119,7 @@ public class StoresTest { @Test public void shouldCreateRocksDbWindowStore() { - assertThat(Stores.persistentWindowStore("store", 1, 3, 1, false).get(), instanceOf(RocksDBWindowStore.class)); + assertThat(Stores.persistentWindowStore("store", 1L, 1L, false).get(), instanceOf(RocksDBWindowStore.class)); } @Test @@ -134,7 +129,7 @@ public class StoresTest { @Test public void shouldBuildWindowStore() { - final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true), + final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true), Serdes.String(), Serdes.String()).build(); assertThat(store, not(nullValue())); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index d7a7283..6b9e7a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -244,7 +244,7 @@ public class RocksDBSegmentedBytesStoreTest { bytesStore = new RocksDBSegmentedBytesStore(storeName, retention, - numSegments, + segmentInterval, schema); bytesStore.init(context, bytesStore); @@ -271,7 +271,7 @@ public class RocksDBSegmentedBytesStoreTest { bytesStore = new RocksDBSegmentedBytesStore(storeName, retention, - numSegments, + segmentInterval, schema); bytesStore.init(context, bytesStore); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index c95cbba..b44d369 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -53,7 +53,7 @@ public class RocksDBSessionStoreTest { schema.init("topic"); final RocksDBSegmentedBytesStore bytesStore = - new RocksDBSegmentedBytesStore("session-store", 10000L, 3, schema); + new RocksDBSegmentedBytesStore("session-store", 10_000L, 60_000L, schema); sessionStore = new RocksDBSessionStore<>(bytesStore, Serdes.String(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 2a84a7b..ac481a7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -69,9 +69,9 @@ public class RocksDBWindowStoreTest { private final int numSegments = 3; private final long windowSize = 3L; private final String windowName = "window"; - private final long segmentSize = 60_000; - private final long retentionPeriod = segmentSize * (numSegments - 1); - private final Segments segments = new Segments(windowName, retentionPeriod, segmentSize); + private final long segmentInterval = 60_000; + private final long retentionPeriod = segmentInterval * (numSegments - 1); + private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval); private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String()); private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); @@ -107,12 +107,7 @@ public class RocksDBWindowStoreTest { private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) { final WindowStore<Integer, String> store = Stores.windowStoreBuilder( - Stores.persistentWindowStore( - windowName, - retentionPeriod, - numSegments, - windowSize, - retainDuplicates), + Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, retainDuplicates, segmentInterval), Serdes.Integer(), Serdes.String()).build(); @@ -134,10 +129,10 @@ public class RocksDBWindowStoreTest { setCurrentTime(currentTime); windowStore.put(1, "one"); - currentTime = currentTime + segmentSize; + currentTime = currentTime + segmentInterval; setCurrentTime(currentTime); windowStore.put(1, "two"); - currentTime = currentTime + segmentSize; + currentTime = currentTime + segmentInterval; setCurrentTime(currentTime); windowStore.put(1, "three"); @@ -145,7 +140,7 @@ public class RocksDBWindowStoreTest { final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime); // roll to the next segment that will close the first - currentTime = currentTime + segmentSize; + currentTime = currentTime + segmentInterval; setCurrentTime(currentTime); windowStore.put(1, "four"); @@ -167,7 +162,7 @@ public class RocksDBWindowStoreTest { @Test public void testRangeAndSinglePointFetch() { windowStore = createWindowStore(context, false); - final long startTime = segmentSize - 4L; + final long startTime = segmentInterval - 4L; putFirstBatch(windowStore, startTime, context); @@ -226,7 +221,7 @@ public class RocksDBWindowStoreTest { @Test public void shouldGetAll() { windowStore = createWindowStore(context, false); - final long startTime = segmentSize - 4L; + final long startTime = segmentInterval - 4L; putFirstBatch(windowStore, startTime, context); @@ -245,7 +240,7 @@ public class RocksDBWindowStoreTest { @Test public void shouldFetchAllInTimeRange() { windowStore = createWindowStore(context, false); - final long startTime = segmentSize - 4L; + final long startTime = segmentInterval - 4L; putFirstBatch(windowStore, startTime, context); @@ -274,7 +269,7 @@ public class RocksDBWindowStoreTest { @Test public void testFetchRange() { windowStore = createWindowStore(context, false); - final long startTime = segmentSize - 4L; + final long startTime = segmentInterval - 4L; putFirstBatch(windowStore, startTime, context); @@ -322,7 +317,7 @@ public class RocksDBWindowStoreTest { @Test public void testPutAndFetchBefore() { windowStore = createWindowStore(context, false); - final long startTime = segmentSize - 4L; + final long startTime = segmentInterval - 4L; putFirstBatch(windowStore, startTime, context); @@ -368,7 +363,7 @@ public class RocksDBWindowStoreTest { @Test public void testPutAndFetchAfter() { windowStore = createWindowStore(context, false); - final long startTime = segmentSize - 4L; + final long startTime = segmentInterval - 4L; putFirstBatch(windowStore, startTime, context); @@ -414,7 +409,7 @@ public class RocksDBWindowStoreTest { @Test public void testPutSameKeyTimestamp() { windowStore = createWindowStore(context, true); - final long startTime = segmentSize - 4L; + final long startTime = segmentInterval - 4L; setCurrentTime(startTime); windowStore.put(0, "zero"); @@ -444,8 +439,8 @@ public class RocksDBWindowStoreTest { windowStore = createWindowStore(context, false); // to validate segments - final long startTime = segmentSize * 2; - final long increment = segmentSize / 2; + final long startTime = segmentInterval * 2; + final long increment = segmentInterval / 2; setCurrentTime(startTime); windowStore.put(0, "zero"); assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir)); @@ -573,8 +568,8 @@ public class RocksDBWindowStoreTest { @Test public void testRestore() throws IOException { - final long startTime = segmentSize * 2; - final long increment = segmentSize / 2; + final long startTime = segmentInterval * 2; + final long increment = segmentInterval / 2; windowStore = createWindowStore(context, false); setCurrentTime(startTime); @@ -725,7 +720,7 @@ public class RocksDBWindowStoreTest { new File(storeDir, segments.segmentName(6L)).mkdir(); windowStore.close(); - context.setStreamTime(segmentSize * 6L); + context.setStreamTime(segmentInterval * 6L); windowStore = createWindowStore(context, false); final List<String> expected = Utils.mkList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)); @@ -768,13 +763,9 @@ public class RocksDBWindowStoreTest { public void shouldFetchAndIterateOverExactKeys() { final long windowSize = 0x7a00000000000000L; final long retentionPeriod = 0x7a00000000000000L; + final WindowStore<String, String> windowStore = Stores.windowStoreBuilder( - Stores.persistentWindowStore( - windowName, - retentionPeriod, - 2, - windowSize, - true), + Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, true), Serdes.String(), Serdes.String()).build(); @@ -837,7 +828,7 @@ public class RocksDBWindowStoreTest { @Test public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() { windowStore = new RocksDBWindowStore<>( - new RocksDBSegmentedBytesStore(windowName, retentionPeriod, numSegments, new WindowKeySchema()), + new RocksDBSegmentedBytesStore(windowName, retentionPeriod, segmentInterval, new WindowKeySchema()), Serdes.Integer(), new SerdeThatDoesntHandleNull(), false, @@ -850,12 +841,7 @@ public class RocksDBWindowStoreTest { @Test public void shouldFetchAndIterateOverExactBinaryKeys() { final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder( - Stores.persistentWindowStore( - windowName, - 60000, - 2, - 60000, - true), + Stores.persistentWindowStore(windowName, 60_000L, 60_000L, true), Serdes.Bytes(), Serdes.String()).build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 66ea3c4..4916cb0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -76,7 +76,13 @@ public class StreamThreadStateStoreProviderTest { topology.addSource("the-source", topicName); topology.addProcessor("the-processor", new MockProcessorSupplier(), "the-source"); topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"), Serdes.String(), Serdes.String()), "the-processor"); - topology.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("window-store", 10, 2, 2, false), Serdes.String(), Serdes.String()), "the-processor"); + topology.addStateStore( + Stores.windowStoreBuilder( + Stores.persistentWindowStore("window-store", 10L, 2L, false), + Serdes.String(), + Serdes.String()), + "the-processor" + ); final Properties properties = new Properties(); final String applicationId = "applicationId";