mjsax commented on code in PR #21674:
URL: https://github.com/apache/kafka/pull/21674#discussion_r2902737486
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java:
##########
@@ -49,17 +46,6 @@ public void destroy() throws IOException {
Utils.delete(dbDir);
}
- @Override
- public synchronized void deleteRange(final Bytes keyFrom, final Bytes
keyTo) {
- super.deleteRange(keyFrom, keyTo);
- }
-
- @Override
- public void openDB(final Map<String, Object> configs, final File stateDir)
{
- super.openDB(configs, stateDir);
- // skip the registering step
- }
Review Comment:
If we make `openDB` public on `RocksDBStore`, we don't need this override
any longer.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -181,8 +181,11 @@ public void init(final StateStoreContext stateStoreContext,
false);
}
+ // This method must be public, to allow us to share code inside
AbstractSegments#openSegmentDB(...)
+ // We declare the same method on interface Segment.openDB(...) to make it
accessible
+ // and interface methods are `public`
@SuppressWarnings("unchecked")
- void openDB(final Map<String, Object> configs, final File stateDir) {
+ public void openDB(final Map<String, Object> configs, final File stateDir)
{
Review Comment:
This is the kinda undesired, but necessary side effect: we need to make this
`public` if we to share more "segments code" -- let me know what you think
about this tradeoff.
It's not super clean TBH, as it does break encapsulation; if we think the
benefit is too small (to save a little bit of duplicated code), also very happy
to revert this part.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java:
##########
@@ -59,7 +59,9 @@ abstract class AbstractSegments<S extends Segment> implements
Segments<S> {
protected abstract S createSegment(long segmentId, String segmentName);
- protected abstract void openSegmentDB(final S segment, final
StateStoreContext context);
+ protected void openSegment(final S segment, final StateStoreContext
context) {
+ segment.openDB(context.appConfigs(), context.stateDir());
+ }
Review Comment:
Let me know what you think of this -- I thought it would be good to have a
default impl, which works for most `Segments` but it's a little bit of a hack
to make it work. Cf other comments.
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java:
##########
@@ -126,4 +126,11 @@ public void shouldCompareSegmentIdOnly() {
segment2.close();
segment3.close();
}
+
+ @Test
+ public void shouldGetCorrectSegmentString() {
Review Comment:
Closing small test gap.
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldGetSegmentNameFromId() {
- assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
- assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
- }
-
- @Test
- public void shouldCreateSegments() {
- final SessionSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final SessionSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final SessionSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(2, context, -1L);
-
- assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
- assertTrue(segment1.isOpen());
- assertTrue(segment2.isOpen());
- assertTrue(segment3.isOpen());
- }
-
- @Test
- public void shouldNotCreateSegmentThatIsAlreadyExpired() {
- final long streamTime = updateStreamTimeAndCreateSegment(7);
- assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- }
-
- @Test
- public void shouldCleanupSegmentsThatHaveExpired() {
- final SessionSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final SessionSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final SessionSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-
- assertFalse(segment1.isOpen());
- assertFalse(segment2.isOpen());
- assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
- }
-
- @Test
- public void shouldGetSegmentForTimestamp() {
- final SessionSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.segmentForTimestamp(0L));
- }
-
- @Test
- public void shouldGetCorrectSegmentString() {
- final SessionSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- assertEquals("SessionSegmentWithHeaders(id=0, name=test.0)",
segment.toString());
- }
-
- @Test
- public void shouldCloseAllOpenSegments() {
- final SessionSegmentWithHeaders first =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final SessionSegmentWithHeaders second =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final SessionSegmentWithHeaders third =
segments.getOrCreateSegmentIfLive(2, context, -1L);
- segments.close();
-
- assertFalse(first.isOpen());
- assertFalse(second.isOpen());
- assertFalse(third.isOpen());
+ public void shouldCreateSegmentsOfCorrectType() {
Review Comment:
New test
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldGetSegmentNameFromId() {
- assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
- assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
- }
-
- @Test
- public void shouldCreateSegments() {
- final TimestampedSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(2, context, -1L);
-
- assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
- assertTrue(segment1.isOpen());
- assertTrue(segment2.isOpen());
- assertTrue(segment3.isOpen());
- }
-
- @Test
- public void shouldNotCreateSegmentThatIsAlreadyExpired() {
- final long streamTime = updateStreamTimeAndCreateSegment(7);
- assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- }
-
- @Test
- public void shouldCleanupSegmentsThatHaveExpired() {
- final TimestampedSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-
- assertFalse(segment1.isOpen());
- assertFalse(segment2.isOpen());
- assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
- }
-
- @Test
- public void shouldGetSegmentForTimestamp() {
- final TimestampedSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.segmentForTimestamp(0L));
- }
-
- @Test
- public void shouldGetCorrectSegmentString() {
- final TimestampedSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- assertEquals("TimestampedSegmentWithHeaders(id=0, name=test.0)",
segment.toString());
- }
-
- @Test
- public void shouldCloseAllOpenSegments() {
- final TimestampedSegmentWithHeaders first =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegmentWithHeaders second =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegmentWithHeaders third =
segments.getOrCreateSegmentIfLive(2, context, -1L);
- segments.close();
-
- assertFalse(first.isOpen());
- assertFalse(second.isOpen());
- assertFalse(third.isOpen());
+ public void shouldCreateSegmentsOfCorrectType() {
+ final TimestampedSegmentWithHeaders segment =
segments.getOrCreateSegment(0, context);
+ assertNotNull(segment);
+ assertInstanceOf(TimestampedSegmentWithHeaders.class, segment);
+ assertEquals(0L, segment.id());
+ assertEquals("testStore.0", segment.name());
}
@Test
public void shouldOpenExistingSegments() {
Review Comment:
updated test
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java:
##########
@@ -40,11 +40,6 @@ protected KeyValueSegment createSegment(final long
segmentId, final String segme
return new KeyValueSegment(segmentName, name, segmentId, position,
metricsRecorder);
}
- @Override
- protected void openSegmentDB(final KeyValueSegment segment, final
StateStoreContext context) {
- segment.openDB(context.appConfigs(), context.stateDir());
- }
Review Comment:
Not needed any longer -- can re-sure the new one from `AbstractSegments` now
-- if we don't like the implication of having a default impl in
`AbstractSegments` for this method, would need to add it back here. Similar for
other segments classes.
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -82,31 +81,36 @@ public void tearDown() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+ public void shouldCreateSegmentsOfCorrectType() {
Review Comment:
Newly added test.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -151,7 +157,7 @@ public synchronized void close() {
iterators = new HashSet<>(openIterators);
openIterators.clear();
}
- if (iterators.size() != 0) {
+ if (!iterators.isEmpty()) {
Review Comment:
Side cleanup
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java:
##########
@@ -74,282 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
Review Comment:
These test live in `AbstractSegmentsTest` now.
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java:
##########
@@ -74,282 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
- final KeyValueSegments segments = new KeyValueSegments("test",
METRICS_SCOPE, 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
- assertEquals(0, segments.segmentId(0));
- assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldGetSegmentNameFromId() {
- assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
- assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
- }
-
- @Test
- public void shouldCreateSegments() {
- final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1,
context, -1L);
- final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2,
context, -1L);
- assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
- assertTrue(segment1.isOpen());
- assertTrue(segment2.isOpen());
- assertTrue(segment3.isOpen());
- }
-
- @Test
- public void shouldNotCreateSegmentThatIsAlreadyExpired() {
- final long streamTime = updateStreamTimeAndCreateSegment(7);
- assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- }
-
- @Test
- public void shouldCleanupSegmentsThatHaveExpired() {
- final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1,
context, -1L);
- final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7,
context, SEGMENT_INTERVAL * 7L);
- assertFalse(segment1.isOpen());
- assertFalse(segment2.isOpen());
- assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
- }
-
- @Test
- public void shouldGetSegmentForTimestamp() {
- final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.segmentForTimestamp(0L));
- }
-
- @Test
- public void shouldGetCorrectSegmentString() {
- final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- assertEquals("KeyValueSegment(id=0, name=test.0)", segment.toString());
- }
-
- @Test
- public void shouldCloseAllOpenSegments() {
- final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1,
context, -1L);
- final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2,
context, -1L);
- segments.close();
-
- assertFalse(first.isOpen());
- assertFalse(second.isOpen());
- assertFalse(third.isOpen());
+ public void shouldCreateSegmentsOfCorrectType() {
Review Comment:
New test to close test gap after previous refactoring
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##########
@@ -72,7 +72,7 @@ protected LogicalKeyValueSegment createSegment(final long
segmentId, final Strin
}
@Override
- protected void openSegmentDB(final LogicalKeyValueSegment segment, final
StateStoreContext context) {
+ protected void openSegment(final LogicalKeyValueSegment segment, final
StateStoreContext context) {
// no-op -- a logical segment is just a view on an underlying physical
store
}
Review Comment:
Only `LogicalKeyValueSegments` cannot re-use the new default from
`AbstractSegments`, so keeping the override here.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java:
##########
@@ -40,20 +40,6 @@ protected SessionSegmentWithHeaders createSegment(final long
segmentId, final St
return new SessionSegmentWithHeaders(segmentName, name, segmentId,
position, metricsRecorder);
}
- @Override
- protected void openSegmentDB(final SessionSegmentWithHeaders segment,
final StateStoreContext context) {
- segment.openDB(context.appConfigs(), context.stateDir());
- }
-
- @Override
- public SessionSegmentWithHeaders getOrCreateSegmentIfLive(final long
segmentId,
Review Comment:
This override was never necessary -- guess a race condition between Bill
session-header-store PR, and my previous refactoring of `AbstractSegments`
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java:
##########
@@ -29,42 +29,32 @@
import org.junit.jupiter.api.Test;
import java.io.File;
-import java.nio.file.Files;
-import java.text.SimpleDateFormat;
-import java.util.Date;
import java.util.List;
-import java.util.SimpleTimeZone;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KeyValueSegmentsTest {
- private static final int NUM_SEGMENTS = 5;
private static final long SEGMENT_INTERVAL = 100L;
private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
private static final String METRICS_SCOPE = "test-state-id";
- private InternalMockProcessorContext context;
+ private InternalMockProcessorContext<?, ?> context;
private KeyValueSegments segments;
- private File stateDirectory;
- private final String storeName = "test";
@BeforeEach
public void createContext() {
- stateDirectory = TestUtils.tempDirectory();
+ final File stateDirectory = TestUtils.tempDirectory();
context = new InternalMockProcessorContext<>(
stateDirectory,
Serdes.String(),
Serdes.Long(),
new MockRecordCollector(),
new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
);
- segments = new KeyValueSegments(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
+ segments = new KeyValueSegments("testStore", METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
Review Comment:
Just some side cleanup here and above.
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
Review Comment:
Moved to `AbstractSegmentsTests` now
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -82,31 +81,36 @@ public void tearDown() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
Review Comment:
Removed tests are now in `AbstractSegmentsTest`
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSegmentsTest.java:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.SimpleTimeZone;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbstractSegmentsTest {
Review Comment:
We pull in all redundant methods of "segments subclass tests" into a single
unified test -- this avoid test redundancies.
The trade-off is, that we need to add `TestSegments` and `TestSegment` which
is some boilerplate code...
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java:
##########
@@ -29,6 +31,13 @@ public interface Segment extends KeyValueStore<Bytes,
byte[]>, BatchWritingStore
void deleteRange(Bytes keyFrom, Bytes keyTo);
+ /*
+ * This method is only declared to all us to share more code.
+ *
+ * With this method we can provide a default implementation of
AbstractSegments#openSegmentDB(...).
+ */
+ void openDB(final Map<String, Object> configs, final File stateDir);
Review Comment:
If we don't want to make `RockDBStore.openDB` public, this declaration would
go away.
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -82,31 +81,36 @@ public void tearDown() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+ public void shouldCreateSegmentsOfCorrectType() {
+ final LogicalKeyValueSegment segment = segments.getOrCreateSegment(0,
context);
+ assertEquals(LogicalKeyValueSegment.class, segment.getClass());
+ assertEquals(0L, segment.id());
+ assertEquals("logical-segments.0", segment.name());
}
@Test
- public void shouldCreateSegments() {
- final LogicalKeyValueSegment segment1 =
segments.getOrCreateSegmentIfLive(0, context, 0L);
- final LogicalKeyValueSegment segment2 =
segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL);
- final LogicalKeyValueSegment segment3 =
segments.getOrCreateSegmentIfLive(2, context, 2 * SEGMENT_INTERVAL);
+ public void shouldOpenExistingPhysicalStore() {
Review Comment:
Newly added test -- `LogcialKeyValueSegments` is an outlier and need so
special testing, because logical segments are just view over a single physical
store.
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java:
##########
@@ -74,282 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
- final KeyValueSegments segments = new KeyValueSegments("test",
METRICS_SCOPE, 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
- assertEquals(0, segments.segmentId(0));
- assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldGetSegmentNameFromId() {
- assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
- assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
- }
-
- @Test
- public void shouldCreateSegments() {
- final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1,
context, -1L);
- final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2,
context, -1L);
- assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
- assertTrue(segment1.isOpen());
- assertTrue(segment2.isOpen());
- assertTrue(segment3.isOpen());
- }
-
- @Test
- public void shouldNotCreateSegmentThatIsAlreadyExpired() {
- final long streamTime = updateStreamTimeAndCreateSegment(7);
- assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- }
-
- @Test
- public void shouldCleanupSegmentsThatHaveExpired() {
- final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1,
context, -1L);
- final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7,
context, SEGMENT_INTERVAL * 7L);
- assertFalse(segment1.isOpen());
- assertFalse(segment2.isOpen());
- assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
- }
-
- @Test
- public void shouldGetSegmentForTimestamp() {
- final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.segmentForTimestamp(0L));
- }
-
- @Test
- public void shouldGetCorrectSegmentString() {
- final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- assertEquals("KeyValueSegment(id=0, name=test.0)", segment.toString());
- }
-
- @Test
- public void shouldCloseAllOpenSegments() {
- final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1,
context, -1L);
- final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2,
context, -1L);
- segments.close();
-
- assertFalse(first.isOpen());
- assertFalse(second.isOpen());
- assertFalse(third.isOpen());
+ public void shouldCreateSegmentsOfCorrectType() {
+ final KeyValueSegment segment = segments.getOrCreateSegment(0,
context);
+ assertNotNull(segment);
+ assertInstanceOf(KeyValueSegment.class, segment);
+ assertEquals(0L, segment.id());
+ assertEquals("testStore.0", segment.name());
}
@Test
public void shouldOpenExistingSegments() {
Review Comment:
"New" test to close test gap after previous refactoring (ie, test has the
same name, but does test some different code now)
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java:
##########
@@ -74,283 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
Review Comment:
Moved to `AbstractSegmentsTest` now
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TestSegment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.rocksdb.WriteBatchInterface;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Simple in-memory segment implementation for testing AbstractSegments.
+ */
+class TestSegment implements Segment {
+ final long id;
+ private final String name;
+ private boolean open = false;
+ private File dbDir;
Review Comment:
We use an actually file in the tests to see if segments got created...
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldGetSegmentNameFromId() {
- assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
- assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
- }
-
- @Test
- public void shouldCreateSegments() {
- final SessionSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final SessionSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final SessionSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(2, context, -1L);
-
- assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
- assertTrue(segment1.isOpen());
- assertTrue(segment2.isOpen());
- assertTrue(segment3.isOpen());
- }
-
- @Test
- public void shouldNotCreateSegmentThatIsAlreadyExpired() {
- final long streamTime = updateStreamTimeAndCreateSegment(7);
- assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- }
-
- @Test
- public void shouldCleanupSegmentsThatHaveExpired() {
- final SessionSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final SessionSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final SessionSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-
- assertFalse(segment1.isOpen());
- assertFalse(segment2.isOpen());
- assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
- }
-
- @Test
- public void shouldGetSegmentForTimestamp() {
- final SessionSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.segmentForTimestamp(0L));
- }
-
- @Test
- public void shouldGetCorrectSegmentString() {
- final SessionSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- assertEquals("SessionSegmentWithHeaders(id=0, name=test.0)",
segment.toString());
- }
-
- @Test
- public void shouldCloseAllOpenSegments() {
- final SessionSegmentWithHeaders first =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final SessionSegmentWithHeaders second =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final SessionSegmentWithHeaders third =
segments.getOrCreateSegmentIfLive(2, context, -1L);
- segments.close();
-
- assertFalse(first.isOpen());
- assertFalse(second.isOpen());
- assertFalse(third.isOpen());
+ public void shouldCreateSegmentsOfCorrectType() {
+ final SessionSegmentWithHeaders segment =
segments.getOrCreateSegment(0, context);
+ assertNotNull(segment);
+ assertInstanceOf(SessionSegmentWithHeaders.class, segment);
+ assertEquals(0L, segment.id());
+ assertEquals("testStore.0", segment.name());
}
@Test
public void shouldOpenExistingSegments() {
Review Comment:
Again: "same" test, but different test logic now
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TestSegment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.rocksdb.WriteBatchInterface;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Simple in-memory segment implementation for testing AbstractSegments.
+ */
+class TestSegment implements Segment {
+ final long id;
+ private final String name;
+ private boolean open = false;
+ private File dbDir;
+
+ TestSegment(final String name, final long id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ public long id() {
+ return id;
+ }
+
+ public void destroy() throws IOException {
+ if (dbDir != null && dbDir.exists()) {
+ deleteDirectory(dbDir);
+ }
+ }
+
+ public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void openDB(final Map<String, Object> configs, final File stateDir)
{
+ if (stateDir != null) {
+ final String storeName = name.substring(0, name.indexOf('.'));
+ final File storeDir = new File(stateDir, storeName);
+ dbDir = new File(storeDir, name);
+ if (!dbDir.exists()) {
+ dbDir.mkdirs();
+ }
+ }
+ open = true;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ if (context.stateDir() != null) {
+ // Extract store name from segment name (e.g., "test.0" -> "test")
+ final String storeName = name.substring(0, name.indexOf('.'));
+ final File storeDir = new File(context.stateDir(), storeName);
+ dbDir = new File(storeDir, name);
+ if (!dbDir.exists()) {
+ dbDir.mkdirs();
+ }
+ }
+ open = true;
+ }
+
+ // no need to implement KeyValueStore methods
Review Comment:
Below is just boilerplate to make it compile... :(
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java:
##########
@@ -74,283 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
- final TimestampedSegments segments =
- new TimestampedSegments("test", METRICS_SCOPE, 8 *
SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
- assertEquals(0, segments.segmentId(0));
- assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldGetSegmentNameFromId() {
- assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
- assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
- }
-
- @Test
- public void shouldCreateSegments() {
- final TimestampedSegment segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegment segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegment segment3 =
segments.getOrCreateSegmentIfLive(2, context, -1L);
- assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
- assertTrue(segment1.isOpen());
- assertTrue(segment2.isOpen());
- assertTrue(segment3.isOpen());
- }
-
- @Test
- public void shouldNotCreateSegmentThatIsAlreadyExpired() {
- final long streamTime = updateStreamTimeAndCreateSegment(7);
- assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- }
-
- @Test
- public void shouldCleanupSegmentsThatHaveExpired() {
- final TimestampedSegment segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegment segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegment segment3 =
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
- assertFalse(segment1.isOpen());
- assertFalse(segment2.isOpen());
- assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
- }
-
- @Test
- public void shouldGetSegmentForTimestamp() {
- final TimestampedSegment segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.segmentForTimestamp(0L));
- }
-
- @Test
- public void shouldGetCorrectSegmentString() {
- final TimestampedSegment segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- assertEquals("TimestampedSegment(id=0, name=test.0)",
segment.toString());
- }
-
- @Test
- public void shouldCloseAllOpenSegments() {
- final TimestampedSegment first = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- final TimestampedSegment second = segments.getOrCreateSegmentIfLive(1,
context, -1L);
- final TimestampedSegment third = segments.getOrCreateSegmentIfLive(2,
context, -1L);
- segments.close();
-
- assertFalse(first.isOpen());
- assertFalse(second.isOpen());
- assertFalse(third.isOpen());
+ public void shouldCreateSegmentsOfCorrectType() {
Review Comment:
new test
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldGetSegmentNameFromId() {
- assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
- assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
- }
-
- @Test
- public void shouldCreateSegments() {
- final TimestampedSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(2, context, -1L);
-
- assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
- assertTrue(segment1.isOpen());
- assertTrue(segment2.isOpen());
- assertTrue(segment3.isOpen());
- }
-
- @Test
- public void shouldNotCreateSegmentThatIsAlreadyExpired() {
- final long streamTime = updateStreamTimeAndCreateSegment(7);
- assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- }
-
- @Test
- public void shouldCleanupSegmentsThatHaveExpired() {
- final TimestampedSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-
- assertFalse(segment1.isOpen());
- assertFalse(segment2.isOpen());
- assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
- }
-
- @Test
- public void shouldGetSegmentForTimestamp() {
- final TimestampedSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.segmentForTimestamp(0L));
- }
-
- @Test
- public void shouldGetCorrectSegmentString() {
- final TimestampedSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- assertEquals("TimestampedSegmentWithHeaders(id=0, name=test.0)",
segment.toString());
- }
-
- @Test
- public void shouldCloseAllOpenSegments() {
- final TimestampedSegmentWithHeaders first =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegmentWithHeaders second =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegmentWithHeaders third =
segments.getOrCreateSegmentIfLive(2, context, -1L);
- segments.close();
-
- assertFalse(first.isOpen());
- assertFalse(second.isOpen());
- assertFalse(third.isOpen());
+ public void shouldCreateSegmentsOfCorrectType() {
Review Comment:
new test
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
Review Comment:
Same. Redundant test unified in `AbstractSegmentsTest`
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java:
##########
@@ -74,283 +64,41 @@ public void close() {
}
@Test
- public void shouldGetSegmentIdsFromTimestamp() {
- assertEquals(0, segments.segmentId(0));
- assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
- assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
- final TimestampedSegments segments =
- new TimestampedSegments("test", METRICS_SCOPE, 8 *
SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
- assertEquals(0, segments.segmentId(0));
- assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
- assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
- }
-
- @Test
- public void shouldGetSegmentNameFromId() {
- assertEquals("test.0", segments.segmentName(0));
- assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
- assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
- }
-
- @Test
- public void shouldCreateSegments() {
- final TimestampedSegment segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegment segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegment segment3 =
segments.getOrCreateSegmentIfLive(2, context, -1L);
- assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
- assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
- assertTrue(segment1.isOpen());
- assertTrue(segment2.isOpen());
- assertTrue(segment3.isOpen());
- }
-
- @Test
- public void shouldNotCreateSegmentThatIsAlreadyExpired() {
- final long streamTime = updateStreamTimeAndCreateSegment(7);
- assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- }
-
- @Test
- public void shouldCleanupSegmentsThatHaveExpired() {
- final TimestampedSegment segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- final TimestampedSegment segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
- final TimestampedSegment segment3 =
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
- assertFalse(segment1.isOpen());
- assertFalse(segment2.isOpen());
- assertTrue(segment3.isOpen());
- assertFalse(new File(context.stateDir(), "test/test.0").exists());
- assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
- assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
- }
-
- @Test
- public void shouldGetSegmentForTimestamp() {
- final TimestampedSegment segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- segments.getOrCreateSegmentIfLive(1, context, -1L);
- assertEquals(segment, segments.segmentForTimestamp(0L));
- }
-
- @Test
- public void shouldGetCorrectSegmentString() {
- final TimestampedSegment segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
- assertEquals("TimestampedSegment(id=0, name=test.0)",
segment.toString());
- }
-
- @Test
- public void shouldCloseAllOpenSegments() {
- final TimestampedSegment first = segments.getOrCreateSegmentIfLive(0,
context, -1L);
- final TimestampedSegment second = segments.getOrCreateSegmentIfLive(1,
context, -1L);
- final TimestampedSegment third = segments.getOrCreateSegmentIfLive(2,
context, -1L);
- segments.close();
-
- assertFalse(first.isOpen());
- assertFalse(second.isOpen());
- assertFalse(third.isOpen());
+ public void shouldCreateSegmentsOfCorrectType() {
+ final TimestampedSegment segment = segments.getOrCreateSegment(0,
context);
+ assertNotNull(segment);
+ assertInstanceOf(TimestampedSegment.class, segment);
+ assertEquals(0L, segment.id());
+ assertEquals("testStore.0", segment.name());
}
@Test
public void shouldOpenExistingSegments() {
Review Comment:
Updated test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]