mjsax commented on code in PR #21674:
URL: https://github.com/apache/kafka/pull/21674#discussion_r2902737486


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java:
##########
@@ -49,17 +46,6 @@ public void destroy() throws IOException {
         Utils.delete(dbDir);
     }
 
-    @Override
-    public synchronized void deleteRange(final Bytes keyFrom, final Bytes 
keyTo) {
-        super.deleteRange(keyFrom, keyTo);
-    }
-
-    @Override
-    public void openDB(final Map<String, Object> configs, final File stateDir) 
{
-        super.openDB(configs, stateDir);
-        // skip the registering step
-    }

Review Comment:
   If we make `openDB` public on `RocksDBStore`, we don't need this override 
any longer.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -181,8 +181,11 @@ public void init(final StateStoreContext stateStoreContext,
             false);
     }
 
+    // This method must be public, to allow us to share code inside 
AbstractSegments#openSegmentDB(...)
+    // We declare the same method on interface Segment.openDB(...) to make it 
accessible
+    // and interface methods are `public`
     @SuppressWarnings("unchecked")
-    void openDB(final Map<String, Object> configs, final File stateDir) {
+    public void openDB(final Map<String, Object> configs, final File stateDir) 
{

Review Comment:
   This is the kinda undesired, but necessary side effect: we need to make this 
`public` if we to share more "segments code" -- let me know what you think 
about this tradeoff.
   
   It's not super clean TBH, as it does break encapsulation; if we think the 
benefit is too small (to save a little bit of duplicated code), also very happy 
to revert this part.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java:
##########
@@ -59,7 +59,9 @@ abstract class AbstractSegments<S extends Segment> implements 
Segments<S> {
 
     protected abstract S createSegment(long segmentId, String segmentName);
 
-    protected abstract void openSegmentDB(final S segment, final 
StateStoreContext context);
+    protected void openSegment(final S segment, final StateStoreContext 
context) {
+        segment.openDB(context.appConfigs(), context.stateDir());
+    }

Review Comment:
   Let me know what you think of this -- I thought it would be good to have a 
default impl, which works for most `Segments` but it's a little bit of a hack 
to make it work. Cf other comments.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java:
##########
@@ -126,4 +126,11 @@ public void shouldCompareSegmentIdOnly() {
         segment2.close();
         segment3.close();
     }
+
+    @Test
+    public void shouldGetCorrectSegmentString() {

Review Comment:
   Closing small test gap.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
-        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
-    }
-
-    @Test
-    public void shouldCreateSegments() {
-        final SessionSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final SessionSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final SessionSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-
-        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(segment1.isOpen());
-        assertTrue(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-    }
-
-    @Test
-    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        final long streamTime = updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-    }
-
-    @Test
-    public void shouldCleanupSegmentsThatHaveExpired() {
-        final SessionSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final SessionSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final SessionSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-
-        assertFalse(segment1.isOpen());
-        assertFalse(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
-    }
-
-    @Test
-    public void shouldGetSegmentForTimestamp() {
-        final SessionSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.segmentForTimestamp(0L));
-    }
-
-    @Test
-    public void shouldGetCorrectSegmentString() {
-        final SessionSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        assertEquals("SessionSegmentWithHeaders(id=0, name=test.0)", 
segment.toString());
-    }
-
-    @Test
-    public void shouldCloseAllOpenSegments() {
-        final SessionSegmentWithHeaders first = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final SessionSegmentWithHeaders second = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final SessionSegmentWithHeaders third = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-        segments.close();
-
-        assertFalse(first.isOpen());
-        assertFalse(second.isOpen());
-        assertFalse(third.isOpen());
+    public void shouldCreateSegmentsOfCorrectType() {

Review Comment:
   New test



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
-        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
-    }
-
-    @Test
-    public void shouldCreateSegments() {
-        final TimestampedSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-
-        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(segment1.isOpen());
-        assertTrue(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-    }
-
-    @Test
-    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        final long streamTime = updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-    }
-
-    @Test
-    public void shouldCleanupSegmentsThatHaveExpired() {
-        final TimestampedSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-
-        assertFalse(segment1.isOpen());
-        assertFalse(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
-    }
-
-    @Test
-    public void shouldGetSegmentForTimestamp() {
-        final TimestampedSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.segmentForTimestamp(0L));
-    }
-
-    @Test
-    public void shouldGetCorrectSegmentString() {
-        final TimestampedSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        assertEquals("TimestampedSegmentWithHeaders(id=0, name=test.0)", 
segment.toString());
-    }
-
-    @Test
-    public void shouldCloseAllOpenSegments() {
-        final TimestampedSegmentWithHeaders first = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegmentWithHeaders second = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegmentWithHeaders third = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-        segments.close();
-
-        assertFalse(first.isOpen());
-        assertFalse(second.isOpen());
-        assertFalse(third.isOpen());
+    public void shouldCreateSegmentsOfCorrectType() {
+        final TimestampedSegmentWithHeaders segment = 
segments.getOrCreateSegment(0, context);
+        assertNotNull(segment);
+        assertInstanceOf(TimestampedSegmentWithHeaders.class, segment);
+        assertEquals(0L, segment.id());
+        assertEquals("testStore.0", segment.name());
     }
 
     @Test
     public void shouldOpenExistingSegments() {

Review Comment:
   updated test



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java:
##########
@@ -40,11 +40,6 @@ protected KeyValueSegment createSegment(final long 
segmentId, final String segme
         return new KeyValueSegment(segmentName, name, segmentId, position, 
metricsRecorder);
     }
 
-    @Override
-    protected void openSegmentDB(final KeyValueSegment segment, final 
StateStoreContext context) {
-        segment.openDB(context.appConfigs(), context.stateDir());
-    }

Review Comment:
   Not needed any longer -- can re-sure the new one from `AbstractSegments` now 
-- if we don't like the implication of having a default impl in 
`AbstractSegments` for this method, would need to add it back here. Similar for 
other segments classes.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -82,31 +81,36 @@ public void tearDown() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    public void shouldCreateSegmentsOfCorrectType() {

Review Comment:
   Newly added test.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -151,7 +157,7 @@ public synchronized void close() {
             iterators = new HashSet<>(openIterators);
             openIterators.clear();
         }
-        if (iterators.size() != 0) {
+        if (!iterators.isEmpty()) {

Review Comment:
   Side cleanup



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java:
##########
@@ -74,282 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {

Review Comment:
   These test live in `AbstractSegmentsTest` now.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java:
##########
@@ -74,282 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
-        final KeyValueSegments segments = new KeyValueSegments("test", 
METRICS_SCOPE, 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
-        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
-    }
-
-    @Test
-    public void shouldCreateSegments() {
-        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, 
context, -1L);
-        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, 
context, -1L);
-        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(segment1.isOpen());
-        assertTrue(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-    }
-
-    @Test
-    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        final long streamTime = updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-    }
-
-    @Test
-    public void shouldCleanupSegmentsThatHaveExpired() {
-        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, 
context, -1L);
-        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7, 
context, SEGMENT_INTERVAL * 7L);
-        assertFalse(segment1.isOpen());
-        assertFalse(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
-    }
-
-    @Test
-    public void shouldGetSegmentForTimestamp() {
-        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.segmentForTimestamp(0L));
-    }
-
-    @Test
-    public void shouldGetCorrectSegmentString() {
-        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        assertEquals("KeyValueSegment(id=0, name=test.0)", segment.toString());
-    }
-
-    @Test
-    public void shouldCloseAllOpenSegments() {
-        final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1, 
context, -1L);
-        final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2, 
context, -1L);
-        segments.close();
-
-        assertFalse(first.isOpen());
-        assertFalse(second.isOpen());
-        assertFalse(third.isOpen());
+    public void shouldCreateSegmentsOfCorrectType() {

Review Comment:
   New test to close test gap after previous refactoring



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##########
@@ -72,7 +72,7 @@ protected LogicalKeyValueSegment createSegment(final long 
segmentId, final Strin
     }
 
     @Override
-    protected void openSegmentDB(final LogicalKeyValueSegment segment, final 
StateStoreContext context) {
+    protected void openSegment(final LogicalKeyValueSegment segment, final 
StateStoreContext context) {
         // no-op -- a logical segment is just a view on an underlying physical 
store
     }

Review Comment:
   Only `LogicalKeyValueSegments` cannot re-use the new default from 
`AbstractSegments`, so keeping the override here.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java:
##########
@@ -40,20 +40,6 @@ protected SessionSegmentWithHeaders createSegment(final long 
segmentId, final St
         return new SessionSegmentWithHeaders(segmentName, name, segmentId, 
position, metricsRecorder);
     }
 
-    @Override
-    protected void openSegmentDB(final SessionSegmentWithHeaders segment, 
final StateStoreContext context) {
-        segment.openDB(context.appConfigs(), context.stateDir());
-    }
-
-    @Override
-    public SessionSegmentWithHeaders getOrCreateSegmentIfLive(final long 
segmentId,

Review Comment:
   This override was never necessary -- guess a race condition between Bill 
session-header-store PR, and my previous refactoring of `AbstractSegments`



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java:
##########
@@ -29,42 +29,32 @@
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
-import java.nio.file.Files;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.List;
-import java.util.SimpleTimeZone;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KeyValueSegmentsTest {
 
-    private static final int NUM_SEGMENTS = 5;
     private static final long SEGMENT_INTERVAL = 100L;
     private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
     private static final String METRICS_SCOPE = "test-state-id";
-    private InternalMockProcessorContext context;
+    private InternalMockProcessorContext<?, ?> context;
     private KeyValueSegments segments;
-    private File stateDirectory;
-    private final String storeName = "test";
 
     @BeforeEach
     public void createContext() {
-        stateDirectory = TestUtils.tempDirectory();
+        final File stateDirectory = TestUtils.tempDirectory();
         context = new InternalMockProcessorContext<>(
             stateDirectory,
             Serdes.String(),
             Serdes.Long(),
             new MockRecordCollector(),
             new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
         );
-        segments = new KeyValueSegments(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
+        segments = new KeyValueSegments("testStore", METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);

Review Comment:
   Just some side cleanup here and above.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {

Review Comment:
   Moved to `AbstractSegmentsTests` now



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -82,31 +81,36 @@ public void tearDown() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {

Review Comment:
   Removed tests are now in `AbstractSegmentsTest`



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSegmentsTest.java:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.SimpleTimeZone;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbstractSegmentsTest {

Review Comment:
   We pull in all redundant methods of "segments subclass tests" into a single 
unified test -- this avoid test redundancies.
   
   The trade-off is, that we need to add `TestSegments` and `TestSegment` which 
is some boilerplate code...



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java:
##########
@@ -29,6 +31,13 @@ public interface Segment extends KeyValueStore<Bytes, 
byte[]>, BatchWritingStore
 
     void deleteRange(Bytes keyFrom, Bytes keyTo);
 
+    /*
+     * This method is only declared to all us to share more code.
+     *
+     * With this method we can provide a default implementation of 
AbstractSegments#openSegmentDB(...).
+     */
+    void openDB(final Map<String, Object> configs, final File stateDir);

Review Comment:
   If we don't want to make `RockDBStore.openDB` public, this declaration would 
go away. 



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java:
##########
@@ -82,31 +81,36 @@ public void tearDown() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    public void shouldCreateSegmentsOfCorrectType() {
+        final LogicalKeyValueSegment segment = segments.getOrCreateSegment(0, 
context);
+        assertEquals(LogicalKeyValueSegment.class, segment.getClass());
+        assertEquals(0L, segment.id());
+        assertEquals("logical-segments.0", segment.name());
     }
 
     @Test
-    public void shouldCreateSegments() {
-        final LogicalKeyValueSegment segment1 = 
segments.getOrCreateSegmentIfLive(0, context, 0L);
-        final LogicalKeyValueSegment segment2 = 
segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL);
-        final LogicalKeyValueSegment segment3 = 
segments.getOrCreateSegmentIfLive(2, context, 2 * SEGMENT_INTERVAL);
+    public void shouldOpenExistingPhysicalStore() {

Review Comment:
   Newly added test -- `LogcialKeyValueSegments` is an outlier and need so 
special testing, because logical segments are just view over a single physical 
store.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java:
##########
@@ -74,282 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
-        final KeyValueSegments segments = new KeyValueSegments("test", 
METRICS_SCOPE, 8 * SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
-        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
-    }
-
-    @Test
-    public void shouldCreateSegments() {
-        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, 
context, -1L);
-        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, 
context, -1L);
-        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(segment1.isOpen());
-        assertTrue(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-    }
-
-    @Test
-    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        final long streamTime = updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-    }
-
-    @Test
-    public void shouldCleanupSegmentsThatHaveExpired() {
-        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, 
context, -1L);
-        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7, 
context, SEGMENT_INTERVAL * 7L);
-        assertFalse(segment1.isOpen());
-        assertFalse(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
-    }
-
-    @Test
-    public void shouldGetSegmentForTimestamp() {
-        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.segmentForTimestamp(0L));
-    }
-
-    @Test
-    public void shouldGetCorrectSegmentString() {
-        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        assertEquals("KeyValueSegment(id=0, name=test.0)", segment.toString());
-    }
-
-    @Test
-    public void shouldCloseAllOpenSegments() {
-        final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1, 
context, -1L);
-        final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2, 
context, -1L);
-        segments.close();
-
-        assertFalse(first.isOpen());
-        assertFalse(second.isOpen());
-        assertFalse(third.isOpen());
+    public void shouldCreateSegmentsOfCorrectType() {
+        final KeyValueSegment segment = segments.getOrCreateSegment(0, 
context);
+        assertNotNull(segment);
+        assertInstanceOf(KeyValueSegment.class, segment);
+        assertEquals(0L, segment.id());
+        assertEquals("testStore.0", segment.name());
     }
 
     @Test
     public void shouldOpenExistingSegments() {

Review Comment:
   "New" test to close test gap after previous refactoring (ie, test has the 
same name, but does test some different code now)



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java:
##########
@@ -74,283 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {

Review Comment:
   Moved to `AbstractSegmentsTest` now



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TestSegment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.rocksdb.WriteBatchInterface;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Simple in-memory segment implementation for testing AbstractSegments.
+ */
+class TestSegment implements Segment {
+    final long id;
+    private final String name;
+    private boolean open = false;
+    private File dbDir;

Review Comment:
   We use an actually file in the tests to see if segments got created...



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
-        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
-    }
-
-    @Test
-    public void shouldCreateSegments() {
-        final SessionSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final SessionSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final SessionSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-
-        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(segment1.isOpen());
-        assertTrue(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-    }
-
-    @Test
-    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        final long streamTime = updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-    }
-
-    @Test
-    public void shouldCleanupSegmentsThatHaveExpired() {
-        final SessionSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final SessionSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final SessionSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-
-        assertFalse(segment1.isOpen());
-        assertFalse(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
-    }
-
-    @Test
-    public void shouldGetSegmentForTimestamp() {
-        final SessionSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.segmentForTimestamp(0L));
-    }
-
-    @Test
-    public void shouldGetCorrectSegmentString() {
-        final SessionSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        assertEquals("SessionSegmentWithHeaders(id=0, name=test.0)", 
segment.toString());
-    }
-
-    @Test
-    public void shouldCloseAllOpenSegments() {
-        final SessionSegmentWithHeaders first = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final SessionSegmentWithHeaders second = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final SessionSegmentWithHeaders third = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-        segments.close();
-
-        assertFalse(first.isOpen());
-        assertFalse(second.isOpen());
-        assertFalse(third.isOpen());
+    public void shouldCreateSegmentsOfCorrectType() {
+        final SessionSegmentWithHeaders segment = 
segments.getOrCreateSegment(0, context);
+        assertNotNull(segment);
+        assertInstanceOf(SessionSegmentWithHeaders.class, segment);
+        assertEquals(0L, segment.id());
+        assertEquals("testStore.0", segment.name());
     }
 
     @Test
     public void shouldOpenExistingSegments() {

Review Comment:
   Again: "same" test, but different test logic now



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TestSegment.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.rocksdb.WriteBatchInterface;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Simple in-memory segment implementation for testing AbstractSegments.
+ */
+class TestSegment implements Segment {
+    final long id;
+    private final String name;
+    private boolean open = false;
+    private File dbDir;
+
+    TestSegment(final String name, final long id) {
+        this.name = name;
+        this.id = id;
+    }
+
+    public long id() {
+        return id;
+    }
+
+    public void destroy() throws IOException {
+        if (dbDir != null && dbDir.exists()) {
+            deleteDirectory(dbDir);
+        }
+    }
+
+    public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void openDB(final Map<String, Object> configs, final File stateDir) 
{
+        if (stateDir != null) {
+            final String storeName = name.substring(0, name.indexOf('.'));
+            final File storeDir = new File(stateDir, storeName);
+            dbDir = new File(storeDir, name);
+            if (!dbDir.exists()) {
+                dbDir.mkdirs();
+            }
+        }
+        open = true;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        if (context.stateDir() != null) {
+            // Extract store name from segment name (e.g., "test.0" -> "test")
+            final String storeName = name.substring(0, name.indexOf('.'));
+            final File storeDir = new File(context.stateDir(), storeName);
+            dbDir = new File(storeDir, name);
+            if (!dbDir.exists()) {
+                dbDir.mkdirs();
+            }
+        }
+        open = true;
+    }
+
+    // no need to implement KeyValueStore methods

Review Comment:
   Below is just boilerplate to make it compile... :( 



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java:
##########
@@ -74,283 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
-        final TimestampedSegments segments =
-            new TimestampedSegments("test", METRICS_SCOPE, 8 * 
SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
-        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
-    }
-
-    @Test
-    public void shouldCreateSegments() {
-        final TimestampedSegment segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegment segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegment segment3 = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(segment1.isOpen());
-        assertTrue(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-    }
-
-    @Test
-    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        final long streamTime = updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-    }
-
-    @Test
-    public void shouldCleanupSegmentsThatHaveExpired() {
-        final TimestampedSegment segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegment segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegment segment3 = 
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-        assertFalse(segment1.isOpen());
-        assertFalse(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
-    }
-
-    @Test
-    public void shouldGetSegmentForTimestamp() {
-        final TimestampedSegment segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.segmentForTimestamp(0L));
-    }
-
-    @Test
-    public void shouldGetCorrectSegmentString() {
-        final TimestampedSegment segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        assertEquals("TimestampedSegment(id=0, name=test.0)", 
segment.toString());
-    }
-
-    @Test
-    public void shouldCloseAllOpenSegments() {
-        final TimestampedSegment first = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        final TimestampedSegment second = segments.getOrCreateSegmentIfLive(1, 
context, -1L);
-        final TimestampedSegment third = segments.getOrCreateSegmentIfLive(2, 
context, -1L);
-        segments.close();
-
-        assertFalse(first.isOpen());
-        assertFalse(second.isOpen());
-        assertFalse(third.isOpen());
+    public void shouldCreateSegmentsOfCorrectType() {

Review Comment:
   new test



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
-        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
-    }
-
-    @Test
-    public void shouldCreateSegments() {
-        final TimestampedSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-
-        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(segment1.isOpen());
-        assertTrue(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-    }
-
-    @Test
-    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        final long streamTime = updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-    }
-
-    @Test
-    public void shouldCleanupSegmentsThatHaveExpired() {
-        final TimestampedSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-
-        assertFalse(segment1.isOpen());
-        assertFalse(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
-    }
-
-    @Test
-    public void shouldGetSegmentForTimestamp() {
-        final TimestampedSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.segmentForTimestamp(0L));
-    }
-
-    @Test
-    public void shouldGetCorrectSegmentString() {
-        final TimestampedSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        assertEquals("TimestampedSegmentWithHeaders(id=0, name=test.0)", 
segment.toString());
-    }
-
-    @Test
-    public void shouldCloseAllOpenSegments() {
-        final TimestampedSegmentWithHeaders first = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegmentWithHeaders second = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegmentWithHeaders third = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-        segments.close();
-
-        assertFalse(first.isOpen());
-        assertFalse(second.isOpen());
-        assertFalse(third.isOpen());
+    public void shouldCreateSegmentsOfCorrectType() {

Review Comment:
   new test



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java:
##########
@@ -66,132 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {

Review Comment:
   Same. Redundant test unified in `AbstractSegmentsTest`



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java:
##########
@@ -74,283 +64,41 @@ public void close() {
     }
 
     @Test
-    public void shouldGetSegmentIdsFromTimestamp() {
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
-        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
-        final TimestampedSegments segments =
-            new TimestampedSegments("test", METRICS_SCOPE, 8 * 
SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
-        assertEquals(0, segments.segmentId(0));
-        assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
-        assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
-    }
-
-    @Test
-    public void shouldGetSegmentNameFromId() {
-        assertEquals("test.0", segments.segmentName(0));
-        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
-        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
-    }
-
-    @Test
-    public void shouldCreateSegments() {
-        final TimestampedSegment segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegment segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegment segment3 = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
-        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
-        assertTrue(segment1.isOpen());
-        assertTrue(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-    }
-
-    @Test
-    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        final long streamTime = updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-    }
-
-    @Test
-    public void shouldCleanupSegmentsThatHaveExpired() {
-        final TimestampedSegment segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        final TimestampedSegment segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
-        final TimestampedSegment segment3 = 
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
-        assertFalse(segment1.isOpen());
-        assertFalse(segment2.isOpen());
-        assertTrue(segment3.isOpen());
-        assertFalse(new File(context.stateDir(), "test/test.0").exists());
-        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
-        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
-    }
-
-    @Test
-    public void shouldGetSegmentForTimestamp() {
-        final TimestampedSegment segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        segments.getOrCreateSegmentIfLive(1, context, -1L);
-        assertEquals(segment, segments.segmentForTimestamp(0L));
-    }
-
-    @Test
-    public void shouldGetCorrectSegmentString() {
-        final TimestampedSegment segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
-        assertEquals("TimestampedSegment(id=0, name=test.0)", 
segment.toString());
-    }
-
-    @Test
-    public void shouldCloseAllOpenSegments() {
-        final TimestampedSegment first = segments.getOrCreateSegmentIfLive(0, 
context, -1L);
-        final TimestampedSegment second = segments.getOrCreateSegmentIfLive(1, 
context, -1L);
-        final TimestampedSegment third = segments.getOrCreateSegmentIfLive(2, 
context, -1L);
-        segments.close();
-
-        assertFalse(first.isOpen());
-        assertFalse(second.isOpen());
-        assertFalse(third.isOpen());
+    public void shouldCreateSegmentsOfCorrectType() {
+        final TimestampedSegment segment = segments.getOrCreateSegment(0, 
context);
+        assertNotNull(segment);
+        assertInstanceOf(TimestampedSegment.class, segment);
+        assertEquals(0L, segment.id());
+        assertEquals("testStore.0", segment.name());
     }
 
     @Test
     public void shouldOpenExistingSegments() {

Review Comment:
   Updated test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to