This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b245e633 Enhance segment lifecycle (#1151)
1b245e633 is described below

commit 1b245e633c624aa7ffa8afcfa355e90c88bd235c
Author: mrproliu <[email protected]>
AuthorDate: Wed Jun 3 13:59:28 2026 +0800

    Enhance segment lifecycle (#1151)
    
    * Enhance segment lifecycle
---
 CHANGES.md                                         |   1 +
 banyand/internal/storage/index.go                  |   6 +-
 banyand/internal/storage/rotation.go               |   2 +-
 banyand/internal/storage/segment.go                | 317 ++++-----
 .../internal/storage/segment_idle_reclaim_test.go  | 708 +++++++++++++++++++++
 .../storage/segment_lifecycle_integration_test.go  | 218 +++++++
 banyand/internal/storage/segment_test.go           | 121 ++--
 .../storage/snapshot_closed_segment_test.go        | 121 +++-
 banyand/internal/storage/tsdb.go                   |  22 +-
 banyand/internal/storage/tsdb_test.go              |   6 +-
 banyand/measure/write_data_segmentref_test.go      |  17 +-
 banyand/stream/write_data_segmentref_test.go       |  17 +-
 banyand/trace/write_data_segmentref_test.go        |  50 +-
 13 files changed, 1316 insertions(+), 290 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ccb1a0ed5..49262221e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,7 @@ Release Notes.
 - Support displaying a measure's indexed tags in the dump tool, resolved per 
part so peak memory is bounded by the part rather than a segment-wide series 
map.
 - Snapshot/backup and data inspection no longer reopen idle-closed segments, 
avoiding cold-segment nil-index panics and index lock-file churn.
 - Add opt-in vectorized measure query tracing over raw-frame distributed 
queries, including a trace envelope and fixed trace-label vocabulary.
+- Enhance segment lifecycle: `refCount` now counts only active users, 
decoupled from "open" (`index != nil`), adding a "dormant" state (open, 
`refCount == 0`). A `DecRef` to zero no longer closes a segment; idle reclaim 
and retention delete act only at `refCount == 0`, so an in-flight 
snapshot/inspect is no longer torn down mid-operation (fixing the cold-node 
nil-index panic and bluge lock churn) while idle segments still release their 
bluge writers.
 
 ### Bug Fixes
 
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index 3ff0ae41c..5b9f17737 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -37,9 +37,9 @@ import (
 const seriesIndexDirName = "sidx"
 
 func (s *segment[T, O]) IndexDB() IndexDB {
-       // Snapshot s.index into a local: the field can be flipped to nil by
-       // performCleanup at any time, so a second read could see a different
-       // value. Returning the local also avoids boxing a nil *seriesIndex
+       // Snapshot s.index into a local: the field can be flipped to nil by a
+       // concurrent reclaim (closeIfIdle / performDelete) at any time, so a 
second
+       // read could see a different value. Returning the local also avoids 
boxing a nil *seriesIndex
        // into a typed-nil IndexDB interface, which would defeat the
        // `if indexDB == nil` guard on the caller side.
        idx := s.index
diff --git a/banyand/internal/storage/rotation.go 
b/banyand/internal/storage/rotation.go
index 2b680ddbb..332e1880a 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -107,7 +107,7 @@ func (d *database[T, O]) startRotationTask() error {
                                                }()
                                                for i := range ss {
                                                        if ss[i].End.UnixNano() 
< ts {
-                                                               
ss[i].index.store.Reset()
+                                                               
ss[i].resetIndex()
                                                        }
                                                }
                                                latest := ss[len(ss)-1]
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 7872cef22..0c941e0c8 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -113,6 +113,11 @@ func (sc *segmentController[T, O]) openSegment(ctx 
context.Context, startTime, e
        }
        s.l = logger.Fetch(ctx, s.String())
        s.lastAccessed.Store(time.Now().UnixNano())
+       // New segments start dormant: resources open (index!=nil) but 
refCount==0,
+       // so an idle, unused segment is reclaimable without any caller 
releasing a
+       // baseline reference.
+       s.mu.Lock()
+       defer s.mu.Unlock()
        return s, s.initialize(ctx)
 }
 
@@ -159,61 +164,44 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T, 
shardIDs []common.ShardID,
        return tt, shardIDs, cc
 }
 
-// incRef acquires one reference on the segment, opening it via initialize
-// if it is currently closed (refCount<=0). On return, the caller is
-// guaranteed that the segment is alive (s.index != nil, shards loaded) and
-// will stay alive until the matching DecRef.
+// incRef acquires one active reference on the segment, opening its resources
+// (series index + shards) via acquire if it is currently closed. On return the
+// segment is guaranteed alive (s.index != nil) until the matching DecRef.
 //
-// incRef intentionally does NOT touch lastAccessed: the idle-segment
-// reclaimer (closeIdleSegments) decides eligibility from that field, and
-// the rotation-tick scan iterates every segment via incRef on each tick.
-// Refreshing lastAccessed here would defeat the reclaimer for any segment
-// outside the active write window. Callers that represent a real read or
-// write touch must bump lastAccessed explicitly (see selectSegments and
+// refCount counts only active users and is orthogonal to whether the segment 
is
+// open (index != nil): a segment with refCount==0 but index!=nil is "dormant"
+// -- open and reusable, yet eligible for the idle reclaimer. incRef therefore
+// does NOT touch lastAccessed: the reclaimer (closeIdleSegments) decides
+// eligibility from that field and the rotation-tick scan iterates every 
segment
+// via incRef on each tick, so refreshing it here would defeat the reclaimer.
+// Real read/write touches bump lastAccessed explicitly (see selectSegments and
 // createSegment); housekeeping iterators must not.
 //
-// The CAS loop closes a TOCTOU race that the older "Load + AddInt32"
-// shape suffered from: a goroutine could read refCount=1, race against a
-// concurrent DecRef that drives it to 0 and triggers performCleanup
-// (which sets s.index=nil and tears down shards), and then AddInt32 it
-// back to 1. The caller would walk away with refCount=1 -- formally a
-// valid reference -- but pointing at an already-cleaned-up segment, so
-// every subsequent IndexDB() / Tables() call would observe zero state.
-//
-// The CAS variant retries whenever a concurrent DecRef beats us to the
-// counter: if CAS fails, we re-Load and either succeed at the new value
-// or fall through to initialize() to reopen the segment under s.mu.
-// Symmetric to the CAS loop in DecRef, which guarantees the dual: a
-// DecRef that drives refCount to 0 sees no concurrent +1 sneak in
-// before performCleanup runs.
+// The fast path CAS-bumps only while refCount>0, so it can never resurrect a
+// segment the idle reclaimer is closing (closeIfIdle / performDelete act only
+// at refCount==0 under s.mu). When refCount<=0 it falls to the s.mu slow path
+// (acquire), which serializes the reopen and the 0->1 transition against them.
 func (s *segment[T, O]) incRef(ctx context.Context) error {
        for {
                current := atomic.LoadInt32(&s.refCount)
                if current <= 0 {
-                       // Either the segment was never opened or DecRef just 
drove
-                       // refCount to 0; reopen under the mutex.
-                       return s.initialize(ctx)
+                       // Dormant (index!=nil) or closed (index==nil); 
reopen/acquire under
+                       // the mutex.
+                       return s.acquire(ctx)
                }
-               // CAS so a concurrent DecRef cannot flip refCount to 0 and run
-               // performCleanup between our Load and our increment. On failure
-               // we re-Load and re-evaluate the branch.
+               // CAS bumps only while refCount>0, so it can never resurrect a 
segment
+               // the idle reclaimer is closing (closeIfIdle acts only at 
refCount==0).
                if atomic.CompareAndSwapInt32(&s.refCount, current, current+1) {
                        return nil
                }
        }
 }
 
+// initialize opens the segment's series index and shards if not already
+// open, leaving the segment dormant (index!=nil) WITHOUT changing refCount.
+// Must be called with s.mu held.
 func (s *segment[T, O]) initialize(ctx context.Context) error {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-
-       if atomic.LoadInt32(&s.refCount) > 0 {
-               // Another goroutine reopened the segment while we waited for 
the
-               // mutex. Add the +1 our caller (incRef) skipped before entering
-               // the slow path; otherwise concurrent reopens would share one
-               // refCount and the first DecRef would cleanup while we are 
still
-               // using it.
-               atomic.AddInt32(&s.refCount, 1)
+       if s.index != nil {
                return nil
        }
 
@@ -234,85 +222,145 @@ func (s *segment[T, O]) initialize(ctx context.Context) 
error {
                s.index = nil
                return errors.Wrap(errOpenDatabase, errors.WithMessage(err, 
"load shards failed").Error())
        }
-       atomic.StoreInt32(&s.refCount, 1)
 
        s.l.Info().Stringer("seg", s).Msg("segment initialized")
        return nil
 }
 
-func (s *segment[T, O]) collectMetrics() {
+// acquire is the slow path of incRef: under s.mu it opens the segment's
+// resources via initialize if closed, then adds the caller's reference (0->1).
+// Holding s.mu serializes the reopen and the 0->1 transition against
+// closeIfIdle / performDelete, which act only at refCount==0 under the same 
lock.
+func (s *segment[T, O]) acquire(ctx context.Context) error {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       if atomic.LoadInt32(&s.refCount) > 0 {
+               // Another goroutine acquired the segment while we waited for 
the mutex.
+               atomic.AddInt32(&s.refCount, 1)
+               return nil
+       }
+       if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
+               // Flagged for deletion with no active reference: performDelete 
has run
+               // (or is about to, blocked on s.mu) and the directory may 
already be
+               // gone. Refuse to reopen so we never open resources on a 
removed dir.
+               // A still-referenced (refCount>0) flagged segment is handled 
by the
+               // fast-bump above, since its directory stays until the last 
DecRef.
+               return ErrSegmentClosed
+       }
+       if err := s.initialize(ctx); err != nil {
+               return err
+       }
+       atomic.StoreInt32(&s.refCount, 1)
+       return nil
+}
+
+// resetIndex resets the series-index cache if the segment is open, holding
+// s.mu.RLock so the index pointer read is synchronized against a concurrent
+// close (which clears it under the write lock).
+func (s *segment[T, O]) resetIndex() {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+       if s.index != nil {
+               s.index.store.Reset()
+       }
+}
+
+// collectOpenMetrics gathers the segment's shard-table and series-index 
metrics
+// if it is open, holding s.mu.RLock so a concurrent reclaim (which takes the
+// write lock) cannot tear the resources down mid-collection. Returns whether
+// the segment was open.
+func (s *segment[T, O]) collectOpenMetrics(shardMetrics Metrics) bool {
        s.mu.RLock()
        defer s.mu.RUnlock()
        if s.index == nil {
-               return
+               return false
+       }
+       if sLst := s.sLst.Load(); sLst != nil {
+               for _, sh := range *sLst {
+                       sh.table.Collect(shardMetrics)
+               }
        }
        s.index.store.CollectMetrics(s.index.p.SegLabelValues()...)
+       return true
 }
 
+// DecRef releases one active reference. Reaching refCount==0 does NOT close 
the
+// segment -- it becomes dormant (open, reclaimable by the idle reclaimer). The
+// sole exception: a segment flagged for deletion (mustBeDeleted) while held 
runs
+// its deferred delete on the last DecRef.
 func (s *segment[T, O]) DecRef() {
-       shouldCleanup := false
-
-       if atomic.LoadInt32(&s.refCount) <= 0 && 
atomic.LoadUint32(&s.mustBeDeleted) != 0 {
-               shouldCleanup = true
-       } else {
-               for {
-                       current := atomic.LoadInt32(&s.refCount)
-                       if current <= 0 {
-                               return
-                       }
-
-                       if atomic.CompareAndSwapInt32(&s.refCount, current, 
current-1) {
-                               shouldCleanup = current == 1
-                               break
+       for {
+               current := atomic.LoadInt32(&s.refCount)
+               if current <= 0 {
+                       // Already dormant; nothing to release. Deletion of a 
dormant
+                       // segment is driven by delete()/performDelete, not 
here.
+                       return
+               }
+               if atomic.CompareAndSwapInt32(&s.refCount, current, current-1) {
+                       if current == 1 && atomic.LoadUint32(&s.mustBeDeleted) 
!= 0 {
+                               s.performDelete()
                        }
+                       return
                }
        }
-
-       if !shouldCleanup {
-               return
-       }
-
-       s.performCleanup()
 }
 
-func (s *segment[T, O]) performCleanup() {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-
-       if atomic.LoadInt32(&s.refCount) > 0 && 
atomic.LoadUint32(&s.mustBeDeleted) == 0 {
-               return
-       }
-
-       deletePath := ""
-       if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
-               deletePath = s.location
-       }
-
+// closeResourcesLocked closes the segment's series index and shards but keeps
+// its on-disk directory, leaving it closed (index==nil) and reopenable. Must 
be
+// called with s.mu held. Idempotent.
+func (s *segment[T, O]) closeResourcesLocked() {
        if s.index != nil {
                if err := s.index.Close(); err != nil {
                        s.l.Panic().Err(err).Msg("failed to close the series 
index")
                }
                s.index = nil
        }
-
-       sLst := s.sLst.Load()
-       if sLst != nil {
+       if sLst := s.sLst.Load(); sLst != nil {
                for _, shard := range *sLst {
                        shard.close()
                }
-               if deletePath == "" {
-                       s.sLst.Store(&[]*shard[T]{})
-               }
+               s.sLst.Store(&[]*shard[T]{})
+       }
+}
+
+// closeIfIdle releases a dormant segment's resources (index writer + shards) 
if
+// it is open, unreferenced, not flagged for deletion, and idle past the
+// threshold. The directory is kept so the segment reopens on next access.
+// Returns whether it closed the segment.
+func (s *segment[T, O]) closeIfIdle(idleThreshold int64) bool {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       if s.index == nil || atomic.LoadInt32(&s.refCount) != 0 ||
+               atomic.LoadUint32(&s.mustBeDeleted) != 0 || 
s.lastAccessed.Load() >= idleThreshold {
+               return false
        }
+       s.closeResourcesLocked()
+       return true
+}
 
-       if deletePath != "" {
-               s.lfs.MustRMAll(deletePath)
+// performDelete closes the segment's resources and removes its directory. It
+// runs once the last active reference is dropped, or immediately from delete()
+// when the segment is already dormant. Idempotent. Must NOT be called with 
s.mu
+// held.
+func (s *segment[T, O]) performDelete() {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       if atomic.LoadInt32(&s.refCount) > 0 {
+               // A reference was acquired before we ran; the next DecRef to 
zero
+               // retries the delete.
+               return
        }
+       s.closeResourcesLocked()
+       s.lfs.MustRMAll(s.location)
 }
 
+// delete flags the segment for deletion. If it is dormant (no active 
reference)
+// the delete happens now; otherwise it is deferred to the last DecRef.
 func (s *segment[T, O]) delete() {
        atomic.StoreUint32(&s.mustBeDeleted, 1)
-       s.DecRef()
+       if atomic.LoadInt32(&s.refCount) == 0 {
+               s.performDelete()
+       }
 }
 
 // Location returns the on-disk directory of the segment.
@@ -327,8 +375,8 @@ func (s *segment[T, O]) Location() string {
 func (s *segment[T, O]) SeriesIndexStats() (int64, int64) {
        s.mu.RLock()
        if s.index != nil {
-               // Hold the read lock while reading live stats so a concurrent
-               // performCleanup cannot close the index out from under us.
+               // Hold the read lock while reading live stats so a concurrent 
reclaim
+               // (closeIfIdle / performDelete) cannot close the index out 
from under us.
                defer s.mu.RUnlock()
                return s.index.Stats()
        }
@@ -613,8 +661,17 @@ func (sc *segmentController[T, O]) segments(ctx 
context.Context, reopenClosed bo
                                return nil, err
                        }
                } else {
-                       if atomic.LoadInt32(&sc.lst[i].refCount) > 0 {
-                               atomic.AddInt32(&sc.lst[i].refCount, 1)
+                       // Pin only if already open (CAS while refCount>0); a 
dormant/closed
+                       // segment is returned unbumped so the idle reclaimer 
is never raced
+                       // into resurrecting it, and the caller's DecRef on it 
is a no-op.
+                       for {
+                               current := atomic.LoadInt32(&sc.lst[i].refCount)
+                               if current <= 0 {
+                                       break
+                               }
+                               if 
atomic.CompareAndSwapInt32(&sc.lst[i].refCount, current, current+1) {
+                                       break
+                               }
                        }
                }
                r[i] = sc.lst[i]
@@ -634,62 +691,24 @@ func (sc *segmentController[T, O]) copySegments() 
[]*segment[T, O] {
        return r
 }
 
+// closeIdleSegments releases the resources of every dormant, idle segment
+// (index!=nil, refCount==0, not flagged for deletion, lastAccessed past the
+// idle threshold), keeping their directories so they reopen on next access.
+// Each segment is decided under its own s.mu, where the refCount==0 check is
+// stable against concurrent acquire (which takes the same lock to go 0->1).
 func (sc *segmentController[T, O]) closeIdleSegments() int {
-       maxIdleTime := sc.idleTimeout
-
-       now := time.Now().UnixNano()
-       idleThreshold := now - maxIdleTime.Nanoseconds()
-
-       // Take our own snapshot rather than going through segments(false): that
-       // helper's "Load>0 then Add" bump is non-atomic and can resurrect a
-       // segment that DecRef just cleaned up, and its caller has no way to 
tell
-       // which entries it actually bumped. closeIdleSegments must know, 
because
-       // it issues an unconditional DecRef per entry to release the bump; if a
-       // concurrent selectSegments reopens an entry we didn't bump, that 
DecRef
-       // would drop the query's only live ref and trigger performCleanup under
-       // active use. Iterate sc.lst under RLock and CAS-bump each open 
segment;
-       // closed segments are recorded as "not bumped" and skipped in the loop.
+       idleThreshold := time.Now().UnixNano() - sc.idleTimeout.Nanoseconds()
        sc.RLock()
-       segs := make([]*segment[T, O], 0, len(sc.lst))
-       bumped := make([]bool, 0, len(sc.lst))
-       refAtBump := make([]int32, 0, len(sc.lst))
-       for _, s := range sc.lst {
-               didBump := false
-               var snapRef int32
-               for {
-                       current := atomic.LoadInt32(&s.refCount)
-                       if current <= 0 {
-                               break
-                       }
-                       if atomic.CompareAndSwapInt32(&s.refCount, current, 
current+1) {
-                               didBump = true
-                               snapRef = current
-                               break
-                       }
-               }
-               segs = append(segs, s)
-               bumped = append(bumped, didBump)
-               refAtBump = append(refAtBump, snapRef)
-       }
+       segs := make([]*segment[T, O], len(sc.lst))
+       copy(segs, sc.lst)
        sc.RUnlock()
 
        closedCount := 0
-       for i, seg := range segs {
-               if !bumped[i] {
-                       continue
-               }
-               // Only close when the snapshot proved refCount==1 (baseline 
only):
-               // a higher value means other callers hold active references, 
and
-               // decrementing past our bump would steal one of their refs,
-               // potentially triggering performCleanup under active use on a
-               // subsequent tick.
-               if refAtBump[i] == 1 && seg.lastAccessed.Load() < idleThreshold 
{
-                       seg.DecRef()
+       for _, s := range segs {
+               if s.closeIfIdle(idleThreshold) {
                        closedCount++
                }
-               seg.DecRef()
        }
-
        return closedCount
 }
 
@@ -890,11 +909,11 @@ func (sc *segmentController[T, O]) remove(deadline 
time.Time) (hasSegment bool,
 // Queries should exclude such fully expired segments to avoid serving 
TTL-expired
 // data; partially expired segments remain visible until their end passes the 
deadline.
 func (sc *segmentController[T, O]) getRetentionDeadline() time.Time {
-       return time.Now().Local().Add(-sc.getOptions().TTL.estimatedDuration())
+       return 
sc.clock.Now().Local().Add(-sc.getOptions().TTL.estimatedDuration())
 }
 
 func (sc *segmentController[T, O]) getExpiredSegmentsTimeRange() 
*timestamp.TimeRange {
-       deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
+       deadline := sc.clock.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
        timeRange := &timestamp.TimeRange{
                IncludeStart: true,
                IncludeEnd:   false,
@@ -913,7 +932,7 @@ func (sc *segmentController[T, O]) 
getExpiredSegmentsTimeRange() *timestamp.Time
 }
 
 func (sc *segmentController[T, O]) deleteExpiredSegments(segmentSuffixes 
[]string) int64 {
-       deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
+       deadline := sc.clock.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
        var count int64
        ss, _ := sc.segments(context.Background(), false)
        sc.l.Info().Str("segment_suffixes", fmt.Sprintf("%s", segmentSuffixes)).
@@ -958,7 +977,7 @@ func (sc *segmentController[T, O]) removeSeg(segID 
segmentID) {
 }
 
 // peekOldestSegmentEndTime returns the end time of the oldest segment.
-// It returns the zero time and false if no segments exist or all segments 
have refCount <= 0.
+// It returns the zero time and false if no segments exist or the oldest 
segment is closed.
 func (sc *segmentController[T, O]) peekOldestSegmentEndTime() (time.Time, 
bool) {
        sc.RLock()
        defer sc.RUnlock()
@@ -971,8 +990,10 @@ func (sc *segmentController[T, O]) 
peekOldestSegmentEndTime() (time.Time, bool)
        // so the first one is the oldest
        oldest := sc.lst[0]
 
-       // Only return segments that are still active (have references > 0)
-       if atomic.LoadInt32(&oldest.refCount) > 0 {
+       oldest.mu.RLock()
+       open := oldest.index != nil
+       oldest.mu.RUnlock()
+       if open {
                return oldest.End, true
        }
 
@@ -1004,10 +1025,16 @@ func (sc *segmentController[T, O]) removeOldest() 
(bool, error) {
 func (sc *segmentController[T, O]) close() {
        sc.Lock()
        defer sc.Unlock()
+       // Full shutdown: release every segment's resources regardless of 
refCount
+       // (DecRef no longer closes on reaching zero). A segment flagged for 
deletion
+       // also has its directory removed; the rest keep their data on disk.
        for _, s := range sc.lst {
-               for atomic.LoadInt32(&s.refCount) > 0 {
-                       s.DecRef()
+               s.mu.Lock()
+               s.closeResourcesLocked()
+               if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
+                       s.lfs.MustRMAll(s.location)
                }
+               s.mu.Unlock()
        }
        sc.lst = sc.lst[:0]
        if sc.metrics != nil {
diff --git a/banyand/internal/storage/segment_idle_reclaim_test.go 
b/banyand/internal/storage/segment_idle_reclaim_test.go
new file mode 100644
index 000000000..10d95e869
--- /dev/null
+++ b/banyand/internal/storage/segment_idle_reclaim_test.go
@@ -0,0 +1,708 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "os"
+       "path/filepath"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// These tests pin the two opposing invariants that any fix for the cold-node
+// snapshot panic / inspect flock must satisfy simultaneously:
+//
+//   1. An actively-held segment (one a caller obtained a live reference to,
+//      e.g. an in-flight snapshot or query) must NEVER be reclaimed by the
+//      idle reclaimer or torn down by retention. Violating this is the root
+//      cause of the production nil-index panic in the snapshot path and the
+//      "exclusive lock" flock churn.
+//
+//   2. A genuinely idle segment with no active reference must STILL be
+//      reclaimed so its bluge index writer is released -- the leak PR #1128
+//      (https://github.com/apache/skywalking-banyandb/pull/1128) fixed by
+//      removing the lastAccessed refresh from incRef.
+//
+// The "Keeps" / "Delete" tests reproduce the current bug (they FAIL on the
+// unfixed tree). The "Reclaims" / "DoesNotRefreshLastAccessed" tests guard the
+// PR #1128 behavior (they PASS on the unfixed tree and must keep passing).
+
+// newReclaimTestController builds a segment controller with the given idle
+// timeout, mirroring the setup shared across segment_test.go.
+func newReclaimTestController(
+       t *testing.T, tempDir string, idleTimeout time.Duration,
+) (*segmentController[mockTSTable, mockTSTableOpener], context.Context) {
+       t.Helper()
+       l := logger.GetLogger("test-idle-reclaim")
+       ctx := context.WithValue(context.Background(), logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{Database: "test-db", Stage: "test-stage"}
+       })
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       2,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 7},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil, // indexMetrics
+               nil, // metrics
+               idleTimeout,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+       return sc, ctx
+}
+
+// openReclaimTestSegment opens a single day segment and registers it with the
+// controller's segment list, matching the helper inlined in segment_test.go.
+func openReclaimTestSegment(
+       ctx context.Context, t *testing.T, sc *segmentController[mockTSTable, 
mockTSTableOpener], tempDir string, start time.Time,
+) *segment[mockTSTable, mockTSTableOpener] {
+       t.Helper()
+       segmentPath := filepath.Join(tempDir, 
"segment-"+start.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segmentPath, DirPerm))
+       require.NoError(t, os.WriteFile(filepath.Join(segmentPath, 
metadataFilename), []byte(currentVersion), FilePerm))
+       seg, err := sc.openSegment(ctx, start, start.Add(24*time.Hour), 
segmentPath, start.Format(dayFormat), sc.groupCache)
+       require.NoError(t, err)
+       sc.Lock()
+       sc.lst = append(sc.lst, seg)
+       sc.sortLst()
+       sc.Unlock()
+       return seg
+}
+
+func reclaimTestDay(t *testing.T) time.Time {
+       t.Helper()
+       now := time.Now().UTC()
+       return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+}
+
+// TestCloseIdleSegments_KeepsActivelyHeldReopenedSegment reproduces the cold
+// node snapshot panic. A snapshot reopens an idle-closed cold segment through
+// segments(ctx,true) (the call in the snapshot path) and holds the reference
+// for the duration of the snapshot. Because that reopen yields refCount==1 --
+// which the old reclaimer could not distinguish from an idle segment -- and
+// segments(ctx,true) does not refresh lastAccessed, the idle reclaimer 
reclaims
+// the segment mid-snapshot and sets index=nil. The held caller then 
dereferences
+// seg.index.store and panics.
+//
+// Invariant under test (model-agnostic): while a live reference is held, the
+// segment's index must stay open. FAILS on the unfixed tree.
+func TestCloseIdleSegments_KeepsActivelyHeldReopenedSegment(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+
+       // 1) Drive the segment into the idle-closed state every cold segment 
lives
+       // in: stale lastAccessed -> reclaimed -> index writer released.
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index, "precondition: the cold segment must start 
idle-closed")
+
+       // 2) The snapshot path reopens every segment via segments(ctx,true) and
+       // holds the references until the snapshot finishes (see
+       // (*database).TakeFileSnapshot).
+       held, err := sc.segments(ctx, true)
+       require.NoError(t, err)
+       require.Len(t, held, 1)
+       defer func() {
+               for _, s := range held {
+                       s.DecRef()
+               }
+       }()
+       require.NotNil(t, seg.index, "the snapshot path must reopen the cold 
segment")
+
+       // 3) The 10-minute idle reclaimer tick fires while the snapshot is 
still
+       // holding the segment. (lastAccessed is still stale because
+       // segments(ctx,true)/incRef intentionally does not refresh it.)
+       sc.closeIdleSegments()
+
+       // 4) The held segment MUST still be open. On the unfixed tree it has 
been
+       // reclaimed (index==nil); the next seg.index.store access -- e.g. the
+       // hard-link in the snapshot path -- then panics with a nil pointer 
dereference.
+       require.NotNil(t, seg.index,
+               "the idle reclaimer must not close a segment that the snapshot 
path is actively holding")
+}
+
+// TestDelete_KeepsIndexWhileActivelyHeld reproduces the second face of the 
same
+// defect: before the fix, retention's delete() tore the index down (and 
removed
+// the segment directory) even while a caller held a live reference, because 
the
+// teardown bypassed the refCount>0 guard once mustBeDeleted was set. The 
holder
+// was left with a use-after-free. Under the new model delete() defers to the
+// last DecRef while the segment is held.
+//
+// This is an isolated, deterministic reproduction of the segment-level defect;
+// in production it is hit as a race between retention and an in-flight reopen
+// on a cold node. FAILS on the unfixed tree.
+func TestDelete_KeepsIndexWhileActivelyHeld(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+
+       // Reduce to the cold-segment shape where a reopen yields the lone 
reference
+       // (refCount==1): idle-close, then reopen as an active holder.
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index)
+
+       require.NoError(t, seg.incRef(ctx)) // active holder, e.g. an in-flight 
query/snapshot
+       require.NotNil(t, seg.index)
+
+       // Retention marks the segment for deletion while the holder is still 
using
+       // it.
+       seg.delete()
+
+       require.NotNil(t, seg.index,
+               "delete() must not close the index while a caller still holds a 
reference")
+       require.DirExists(t, seg.location,
+               "delete() must not remove the segment directory while a caller 
still holds a reference")
+
+       // Once the holder releases its reference, deletion proceeds.
+       seg.DecRef()
+       require.Nil(t, seg.index, "the index must be released once the last 
reference is dropped")
+       require.NoDirExists(t, seg.location, "the directory must be removed 
once the last reference is dropped")
+}
+
+// TestCloseIdleSegments_ReclaimsTrulyIdleSegment guards the PR #1128 goal: a
+// segment that is open but has no active reference and has gone idle must be
+// reclaimed so its bluge index writer is released. Passes on the unfixed tree
+// and must keep passing after the fix.
+func TestCloseIdleSegments_ReclaimsTrulyIdleSegment(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       require.NotNil(t, seg.index)
+
+       // No extra reference is held; only the segment's own open state keeps 
it
+       // alive. Make it idle.
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+
+       require.Equal(t, 1, sc.closeIdleSegments(), "an idle, unreferenced 
segment must be reclaimed")
+       require.Nil(t, seg.index, "a reclaimed segment must release its index 
writer (PR #1128)")
+}
+
+// TestIncRef_DoesNotRefreshLastAccessed pins the invariant PR #1128 
established:
+// incRef -- used by the rotation housekeeping scan that touches every segment
+// via segments(ctx,true) on each tick -- must NOT refresh lastAccessed. If it
+// did, no segment outside the active write window would ever look idle and the
+// reclaimer could never release their bluge writers, reintroducing the exact
+// leak #1128 fixed.
+//
+// This guards against the tempting-but-wrong fix of re-adding
+// lastAccessed.Store(now) inside incRef to paper over the reclaim race.
+func TestIncRef_DoesNotRefreshLastAccessed(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+
+       stale := time.Now().Add(-time.Hour).UnixNano()
+       seg.lastAccessed.Store(stale)
+
+       require.NoError(t, seg.incRef(ctx))
+       defer seg.DecRef()
+
+       require.Equal(t, stale, seg.lastAccessed.Load(),
+               "incRef must not refresh lastAccessed (would defeat the idle 
reclaimer and reintroduce PR #1128's writer leak)")
+}
+
+// A freshly opened segment starts dormant -- open (index!=nil) with no
+// active reference (refCount==0).
+func TestOpenSegment_StartsDormant(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       require.NotNil(t, seg.index, "a freshly opened segment is open")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "a freshly 
opened segment has no active reference")
+}
+
+// A dormant but freshly-accessed segment (refCount==0, lastAccessed recent)
+// must NOT be reclaimed by the idle reclaimer.
+func TestCloseIdleSegments_KeepsFreshDormantSegment(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+       require.Equal(t, 0, sc.closeIdleSegments(), "a fresh dormant segment 
must not be reclaimed")
+       require.NotNil(t, seg.index, "a fresh dormant segment must stay open")
+}
+
+// DecRef to zero leaves the segment dormant (open) and reusable; a
+// subsequent incRef reuses it without reopening.
+func TestDecRef_ToZeroLeavesDormantAndReusable(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       require.NoError(t, seg.incRef(ctx))
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+
+       seg.DecRef()
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+       require.NotNil(t, seg.index, "DecRef to zero keeps the segment open 
(dormant)")
+
+       require.NoError(t, seg.incRef(ctx))
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "incRef 
reuses the dormant segment")
+       require.NotNil(t, seg.index)
+       seg.DecRef()
+}
+
+// A reclaimed (idle-closed) segment reopens on the next incRef.
+func TestReclaimedSegment_ReopensOnIncRef(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index)
+
+       require.NoError(t, seg.incRef(ctx))
+       require.NotNil(t, seg.index, "a reclaimed segment reopens on incRef")
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+       seg.DecRef()
+}
+
+// Deleting a dormant segment (no active reference) closes it and removes its
+// directory immediately.
+func TestDelete_DormantSegmentDeletedImmediately(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+       seg.delete()
+       require.Nil(t, seg.index, "deleting a dormant segment closes it 
immediately")
+       require.NoDirExists(t, seg.location, "deleting a dormant segment 
removes its directory immediately")
+}
+
+// deleteExpiredSegments defers the actual delete of a segment that a caller
+// is actively holding until the last reference is dropped.
+func TestDeleteExpiredSegments_DefersWhileHeld(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       require.NoError(t, seg.incRef(ctx)) // an in-flight query/snapshot 
holds it
+
+       require.Equal(t, int64(1), 
sc.deleteExpiredSegments([]string{seg.suffix}))
+       require.NotNil(t, seg.index, "deletion must be deferred while a 
reference is held")
+       require.DirExists(t, seg.location, "the directory must survive while a 
reference is held")
+
+       seg.DecRef()
+       require.Nil(t, seg.index, "the index is released once the last 
reference is dropped")
+       require.NoDirExists(t, seg.location, "the directory is removed once the 
last reference is dropped")
+}
+
+// Many open/reclaim cycles must not leak the bluge writer lock -- every
+// reopen (which acquires the exclusive directory lock) must keep succeeding.
+func TestReopenCycles_NoWriterLeak(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       for i := 0; i < 8; i++ {
+               require.NoError(t, seg.incRef(ctx), "round %d: reopen must 
succeed (writer lock not leaked)", i)
+               require.NotNil(t, seg.index)
+               seg.DecRef()
+               seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+               require.True(t, seg.closeIfIdle(time.Now().UnixNano()), "round 
%d: reclaim", i)
+               require.Nil(t, seg.index)
+       }
+}
+
+// Under concurrent incRef/DecRef racing a tight closeIdleSegments
+// loop, an acquired reference must ALWAYS observe an open index (the reclaimer
+// never steals an active reference / leaves refCount>=1 with index==nil), no
+// goroutine panics, and the terminal refCount is conserved at 0. Run with 
-race.
+func TestConcurrent_IncRefVsCloseIdle_NoStolenRefOrPanic(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       // Nanosecond idle timeout: the reclaimer considers the segment idle on 
every
+       // tick, maximizing the race against acquirers.
+       sc, ctx := newReclaimTestController(t, tempDir, time.Nanosecond)
+       defer sc.close()
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+
+       // A fixed per-acquirer cycle count (rather than a wall-clock duration) 
keeps
+       // the race deterministic across machines: every run exercises the same 
number
+       // of incRef/DecRef vs closeIdleSegments interleavings, so a slow or 
single-core
+       // CPU only makes the test slower -- it never makes it pass vacuously 
or fail.
+       const (
+               acquirers    = 8
+               cyclesPerAcq = 2000
+       )
+       var (
+               acquirersWG sync.WaitGroup
+               reclaimerWG sync.WaitGroup
+               reclaimDone = make(chan struct{})
+               violations  int64
+               panics      int64
+       )
+       reclaimerWG.Add(1)
+       go func() {
+               defer reclaimerWG.Done()
+               defer func() {
+                       if r := recover(); r != nil {
+                               atomic.AddInt64(&panics, 1)
+                       }
+               }()
+               for {
+                       select {
+                       case <-reclaimDone:
+                               return
+                       default:
+                       }
+                       sc.closeIdleSegments()
+               }
+       }()
+       for i := 0; i < acquirers; i++ {
+               acquirersWG.Add(1)
+               go func() {
+                       defer acquirersWG.Done()
+                       defer func() {
+                               if r := recover(); r != nil {
+                                       atomic.AddInt64(&panics, 1)
+                               }
+                       }()
+                       for c := 0; c < cyclesPerAcq; c++ {
+                               if err := seg.incRef(ctx); err != nil {
+                                       atomic.AddInt64(&violations, 1)
+                                       return
+                               }
+                               // Invariant: a held reference implies an open 
index.
+                               seg.mu.RLock()
+                               if seg.index == nil {
+                                       atomic.AddInt64(&violations, 1)
+                               }
+                               seg.mu.RUnlock()
+                               seg.DecRef()
+                       }
+               }()
+       }
+       acquirersWG.Wait()
+       close(reclaimDone)
+       reclaimerWG.Wait()
+
+       require.Equal(t, int64(0), atomic.LoadInt64(&panics), "no goroutine may 
panic")
+       require.Equal(t, int64(0), atomic.LoadInt64(&violations),
+               "a held reference must always see an open index (no stolen ref 
/ nil index)")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "terminal 
refCount must be conserved at 0")
+}
+
+// The idle reclaimer must refuse to close a segment flagged for deletion,
+// so it never races performDelete's directory removal. closeIfIdle leaves 
such a
+// segment for delete()/performDelete to tear down.
+func TestCloseIfIdle_RefusesWhenMustBeDeleted(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       atomic.StoreUint32(&seg.mustBeDeleted, 1)
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano()) // also 
idle
+
+       require.False(t, seg.closeIfIdle(time.Now().UnixNano()),
+               "closeIfIdle must refuse a segment flagged for deletion")
+       require.NotNil(t, seg.index, "a flagged segment is left for 
performDelete, not idle-closed")
+}
+
+// Once a dormant segment is deleted (its directory removed by
+// performDelete), incRef must NOT reopen it -- the acquire slow path returns
+// ErrSegmentClosed rather than opening resources on the now-missing directory.
+func TestIncRef_AfterDelete_ReturnsErrSegmentClosed(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       seg.delete()
+       require.Nil(t, seg.index)
+       require.NoDirExists(t, seg.location)
+
+       require.ErrorIs(t, seg.incRef(ctx), ErrSegmentClosed,
+               "incRef must refuse to reopen a deleted segment")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "a refused 
acquire adds no reference")
+}
+
+// A DecRef on a segment that is already closed (refCount==0, index==nil),
+// e.g. a stale release from a caller that obtained the segment before the idle
+// reclaimer closed it, is a harmless no-op and never drives refCount negative.
+func TestDecRef_OnClosedSegment_IsNoOp(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       require.True(t, seg.closeIfIdle(time.Now().UnixNano()))
+       require.Nil(t, seg.index)
+
+       seg.DecRef()
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "DecRef on 
a closed segment must not underflow")
+       require.Nil(t, seg.index, "DecRef must not resurrect a closed segment")
+}
+
+// peekOldestSegmentEndTime reports the oldest segment's end time only while
+// that segment is open (index loaded), preserving the pre-dormant-model 
behavior
+// that gated on refCount>0. An idle-closed oldest segment is reported as 
absent.
+func TestPeekOldestSegmentEndTime_OnlyWhileOpen(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       // Keep-one rule: with no segment, or a single segment, peek reports 
absent.
+       _, ok := sc.peekOldestSegmentEndTime()
+       require.False(t, ok, "no segments -> not reported")
+
+       day1 := reclaimTestDay(t)
+       oldest := openReclaimTestSegment(ctx, t, sc, tempDir, day1)
+       _, ok = sc.peekOldestSegmentEndTime()
+       require.False(t, ok, "a single segment is not reported (keep-one rule)")
+
+       openReclaimTestSegment(ctx, t, sc, tempDir, day1.Add(24*time.Hour)) // 
a newer segment; peek needs len>1
+
+       // Open (dormant) oldest: its end time is reported.
+       end, ok := sc.peekOldestSegmentEndTime()
+       require.True(t, ok, "an open oldest segment must be reported")
+       require.Equal(t, oldest.End, end)
+
+       // Idle-close the oldest: it must now be reported as absent.
+       oldest.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       require.True(t, oldest.closeIfIdle(time.Now().UnixNano()))
+       require.Nil(t, oldest.index)
+       _, ok = sc.peekOldestSegmentEndTime()
+       require.False(t, ok, "a closed oldest segment must not be reported")
+
+       // Reopen on access: it is reported again.
+       require.NoError(t, oldest.incRef(ctx))
+       end, ok = sc.peekOldestSegmentEndTime()
+       require.True(t, ok, "a reopened oldest segment is reported again")
+       require.Equal(t, oldest.End, end)
+       oldest.DecRef()
+}
+
+// Startup: the controller's disk-scan startup path (open()) loads every
+// on-disk segment in the DORMANT state -- open (index!=nil) but with 
refCount==0
+// -- which the pre-refactor model could not express (it loaded segments with
+// refCount==1). A loaded segment must be immediately usable (a real read pins
+// and releases it) and reclaimable once idle.
+func TestControllerOpen_LoadsSegmentsDormant(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, _ := newReclaimTestController(t, tempDir, time.Hour)
+
+       day1 := reclaimTestDay(t)
+       day2 := day1.Add(24 * time.Hour)
+       for _, d := range []time.Time{day1, day2} {
+               segPath := filepath.Join(tempDir, "seg-"+d.Format(dayFormat))
+               require.NoError(t, os.MkdirAll(segPath, DirPerm))
+               require.NoError(t, os.WriteFile(filepath.Join(segPath, 
metadataFilename), []byte(currentVersion), FilePerm))
+       }
+
+       require.NoError(t, sc.open())
+       require.Len(t, sc.lst, 2)
+
+       // Startup invariant: every loaded segment is open yet dormant.
+       for _, s := range sc.lst {
+               require.NotNil(t, s.index, "a freshly loaded segment must be 
open")
+               require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount), "a 
freshly loaded segment must be dormant")
+       }
+
+       // Usable: a real read reopens/pins each segment, and the matching 
DecRef
+       // returns it to dormant (still open, refCount back to 0).
+       tr := timestamp.NewInclusiveTimeRange(day1, day2.Add(24*time.Hour))
+       got, err := sc.selectSegments(tr, true)
+       require.NoError(t, err)
+       require.Len(t, got, 2)
+       for _, s := range got {
+               s.DecRef()
+       }
+       for _, s := range sc.lst {
+               require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount), "back 
to dormant after a read/release cycle")
+               require.NotNil(t, s.index, "still open after a read/release 
cycle")
+       }
+
+       // Reclaimable: once idle, the reclaimer closes every loaded segment.
+       for _, s := range sc.lst {
+               s.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       }
+       require.Equal(t, 2, sc.closeIdleSegments())
+       for _, s := range sc.lst {
+               require.Nil(t, s.index, "a loaded segment is reclaimable to 
closed")
+       }
+}
+
+// Shutdown: the controller's close() releases the resources of EVERY
+// segment regardless of refCount (DecRef no longer closes on reaching zero), 
so
+// it must force-close even an actively-held segment. A segment flagged for
+// deletion additionally has its directory removed; the rest keep their data.
+func TestControllerClose_ReleasesAllAndDeletesFlagged(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       day1 := reclaimTestDay(t)
+       dormant := openReclaimTestSegment(ctx, t, sc, tempDir, day1)
+       held := openReclaimTestSegment(ctx, t, sc, tempDir, 
day1.Add(24*time.Hour))
+       flagged := openReclaimTestSegment(ctx, t, sc, tempDir, 
day1.Add(48*time.Hour))
+
+       require.NoError(t, held.incRef(ctx)) // an active reference survives 
into shutdown
+       require.Equal(t, int32(1), atomic.LoadInt32(&held.refCount))
+       atomic.StoreUint32(&flagged.mustBeDeleted, 1)
+       dormantLoc, heldLoc, flaggedLoc := dormant.location, held.location, 
flagged.location
+
+       sc.close()
+
+       require.Empty(t, sc.lst, "controller segment list is cleared on 
shutdown")
+       require.Nil(t, dormant.index, "dormant segment resources are released 
on shutdown")
+       require.Nil(t, held.index, "an actively-held segment is force-closed on 
shutdown")
+       require.Nil(t, flagged.index, "flagged segment resources are released 
on shutdown")
+
+       // Only the flagged segment's directory is removed.
+       require.DirExists(t, dormantLoc, "a normal segment keeps its data on 
disk")
+       require.DirExists(t, heldLoc, "a held segment keeps its data on disk")
+       require.NoDirExists(t, flaggedLoc, "a segment flagged for deletion is 
removed on shutdown")
+}
+
+// removeOldest (disk-pressure retention) deletes the oldest dormant segment
+// outright -- closing it and removing its directory immediately via the new
+// delete() path -- while keeping the rest.
+func TestRemoveOldest_DeletesDormantOldestKeepsRest(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       day1 := reclaimTestDay(t)
+       oldest := openReclaimTestSegment(ctx, t, sc, tempDir, day1)
+       newer := openReclaimTestSegment(ctx, t, sc, tempDir, 
day1.Add(24*time.Hour))
+       oldestLoc := oldest.location
+
+       removed, err := sc.removeOldest()
+       require.NoError(t, err)
+       require.True(t, removed)
+       require.Len(t, sc.lst, 1)
+       require.Equal(t, newer.id, sc.lst[0].id, "the newer segment is kept")
+       require.Nil(t, oldest.index, "the removed oldest is closed")
+       require.NoDirExists(t, oldestLoc, "a dormant oldest segment's directory 
is removed immediately")
+}
+
+// removeOldest honors the keep-one rule -- it never deletes the last
+// remaining segment.
+func TestRemoveOldest_KeepOneRule(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       only := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+       removed, err := sc.removeOldest()
+       require.NoError(t, err)
+       require.False(t, removed, "the last remaining segment must never be 
removed")
+       require.Len(t, sc.lst, 1)
+       require.NotNil(t, only.index, "the kept segment stays open")
+}
+
+// removeOldest defers the actual teardown of the oldest segment while a
+// caller still holds a reference: it leaves the list immediately, but its 
index
+// and directory survive until the last DecRef.
+func TestRemoveOldest_DefersWhileOldestHeld(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+       day1 := reclaimTestDay(t)
+       oldest := openReclaimTestSegment(ctx, t, sc, tempDir, day1)
+       openReclaimTestSegment(ctx, t, sc, tempDir, day1.Add(24*time.Hour))
+       require.NoError(t, oldest.incRef(ctx)) // an in-flight reader holds the 
oldest
+       oldestLoc := oldest.location
+
+       removed, err := sc.removeOldest()
+       require.NoError(t, err)
+       require.True(t, removed)
+       require.Len(t, sc.lst, 1, "the oldest is dropped from the list 
immediately")
+       require.NotNil(t, oldest.index, "deletion is deferred while a reference 
is held")
+       require.DirExists(t, oldestLoc, "the directory survives while held")
+
+       oldest.DecRef()
+       require.Nil(t, oldest.index)
+       require.NoDirExists(t, oldestLoc, "the directory is removed once the 
last reference drops")
+}
+
+// getExpiredSegmentsTimeRange spans exactly the segments older than the TTL
+// deadline, and -- going through segments(false) -- must not leave a leaked
+// reference on any scanned dormant segment.
+func TestGetExpiredSegmentsTimeRange(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+       sc, ctx := newReclaimTestController(t, tempDir, time.Hour) // opts.TTL 
= 7 days
+
+       today := reclaimTestDay(t)
+       expired := openReclaimTestSegment(ctx, t, sc, tempDir, 
today.Add(-10*24*time.Hour))
+       openReclaimTestSegment(ctx, t, sc, tempDir, today) // fresh, not expired
+
+       tr := sc.getExpiredSegmentsTimeRange()
+       require.Equal(t, expired.Start, tr.Start, "range starts at the expired 
segment")
+       require.Equal(t, expired.End, tr.End, "range ends at the expired 
segment")
+
+       for _, s := range sc.lst {
+               require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount),
+                       "scanning the expired range via segments(false) must 
not leak references")
+               require.NotNil(t, s.index, "scanning must not close any 
segment")
+       }
+}
diff --git a/banyand/internal/storage/segment_lifecycle_integration_test.go 
b/banyand/internal/storage/segment_lifecycle_integration_test.go
new file mode 100644
index 000000000..0e21a7941
--- /dev/null
+++ b/banyand/internal/storage/segment_lifecycle_integration_test.go
@@ -0,0 +1,218 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "path/filepath"
+       "strconv"
+       "sync"
+       "sync/atomic"
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+// TestSegmentLifecycle_EndToEnd is a full-flow integration test that drives 
the
+// entire segment lifecycle through the public TSDB API against a real series
+// index (which persists to disk), asserting the dormant-refcount invariants 
and
+// on-disk durability at every transition:
+//
+//     create+write -> active(refCount==1) -> DecRef(dormant, still open) ->
+//     query reopen/pin/release -> snapshot via live state ->
+//     idle reclaim (flush to disk, release writer lock) ->
+//     closed-segment stats + snapshot (read-only / hard-link, NO reopen) ->
+//     reopen on read -> restart (Close + reopen, data survives on disk) ->
+//     retention delete (directory removed).
+func TestSegmentLifecycle_EndToEnd(t *testing.T) {
+       dir := snapshotTestDir(t)
+       const seriesCount = 12
+       tr := allTimeRange()
+
+       tsdb, sc := openSnapshotTSDB(t, dir, 3)
+
+       // 1. Create + write: a fresh segment is open and actively referenced.
+       s, err := tsdb.CreateSegmentIfNotExist(snapshotTestBase)
+       require.NoError(t, err)
+       seg := s.(*segment[*MockTSTable, any])
+       require.NotNil(t, seg.index, "a freshly created segment is open")
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "create 
yields exactly one active reference")
+       require.NoError(t, s.IndexDB().Insert(buildTestSeriesDocs(t, 
seriesCount)))
+
+       // 2. Release to dormant: DecRef to zero keeps the segment OPEN (the
+       // dormant-refcount invariant -- it does not close), and the data stays 
readable.
+       s.DecRef()
+       require.NotNil(t, seg.index, "DecRef to zero leaves the segment open 
(dormant), not closed")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+       count, size := seg.SeriesIndexStats()
+       require.Equal(t, int64(seriesCount), count, "data is readable while 
dormant")
+       require.Positive(t, size)
+
+       // 3. Query path: a real read reopens-if-needed, pins the segment, and 
the
+       // matching DecRef returns it to dormant (still open).
+       got, err := tsdb.SelectSegments(tr, true)
+       require.NoError(t, err)
+       require.Len(t, got, 1)
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "a query 
pins the segment")
+       for _, g := range got {
+               g.DecRef()
+       }
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "query 
released back to dormant")
+       require.NotNil(t, seg.index)
+
+       // 4. Snapshot while open (dormant): goes through the live-state path, 
must
+       // not close the segment, and captures every document.
+       openSnap := filepath.Join(t.TempDir(), "snap-open")
+       created, err := tsdb.TakeFileSnapshot(openSnap)
+       require.NoError(t, err)
+       require.True(t, created)
+       require.NotNil(t, seg.index, "snapshot must not close an open segment")
+       require.Equal(t, int64(seriesCount), snapshotSeriesDocCount(t, 
openSnap, seg), "open-path snapshot holds all docs")
+
+       // 5. Idle reclaim: closes the segment, flushing the index to disk and
+       // releasing the bluge exclusive writer lock.
+       idleClose(t, sc, seg)
+       require.NoFileExists(t, filepath.Join(seg.location, seriesIndexDirName, 
inverted.LockFilename),
+               "idle-close releases the bluge writer lock (bluge.pid)")
+
+       // 6. Closed segment: stats are read from disk read-only and must NOT 
reopen
+       // the writable index.
+       count, size = seg.SeriesIndexStats()
+       require.Equal(t, int64(seriesCount), count, "closed-segment data is 
read from disk")
+       require.Positive(t, size)
+       require.Nil(t, seg.index, "reading stats must not reopen a closed 
segment")
+
+       // 7. Snapshot while closed: hard-link path, identical content, still 
no reopen.
+       closedSnap := filepath.Join(t.TempDir(), "snap-closed")
+       created, err = tsdb.TakeFileSnapshot(closedSnap)
+       require.NoError(t, err)
+       require.True(t, created)
+       require.Nil(t, seg.index, "snapshot must not reopen a closed segment")
+       require.Equal(t, int64(seriesCount), snapshotSeriesDocCount(t, 
closedSnap, seg),
+               "closed-path snapshot captures the same docs as the open-path 
snapshot")
+
+       // 8. Reopen on access: a real read reopens the closed segment.
+       got, err = tsdb.SelectSegments(tr, true)
+       require.NoError(t, err)
+       require.Len(t, got, 1)
+       require.NotNil(t, seg.index, "a read reopens a closed segment")
+       for _, g := range got {
+               g.DecRef()
+       }
+
+       // 9. Restart: closing the DB flushes everything; reopening reloads the
+       // segment dormant and the series data survives on disk.
+       require.NoError(t, tsdb.Close())
+       tsdb2, sc2 := openSnapshotTSDB(t, dir, 3)
+       require.Len(t, sc2.lst, 1, "the segment is reloaded from disk on 
restart")
+       reloaded := sc2.lst[0]
+       require.NotNil(t, reloaded.index, "a reloaded segment is open")
+       require.Equal(t, int32(0), atomic.LoadInt32(&reloaded.refCount), "a 
reloaded segment starts dormant")
+       count, _ = reloaded.SeriesIndexStats()
+       require.Equal(t, int64(seriesCount), count, "series data survives a 
restart on disk")
+
+       // 10. Retention: removeOldest honors keep-one, so add a newer segment, 
then
+       // delete the oldest -- its directory is removed from disk.
+       _ = createSegmentWithSeries(t, tsdb2, snapshotDay(1), 3)
+       oldestLoc := reloaded.location
+       removed, err := sc2.removeOldest()
+       require.NoError(t, err)
+       require.True(t, removed)
+       require.NoDirExists(t, oldestLoc, "retention removes the oldest 
segment's directory")
+       require.Len(t, sc2.lst, 1, "the newer segment remains after retention")
+
+       require.NoError(t, tsdb2.Close())
+}
+
+// TestSegment_PostDeleteOperations_NoPanic pins the safety of operations that
+// can still run against a segment AFTER it has been deleted (its directory
+// removed): inspect (SeriesIndexStats + CollectClosedShardInfo) and backup
+// (snapshotInto). Under the dormant-refcount model a dormant segment is NOT
+// pinned by the stats path, so these can race a concurrent delete that tears
+// down resources and removes the directory. All such operations must degrade
+// gracefully (no panic, best-effort zero results), never read a removed dir
+// fatally. Run with -race.
+func TestSegment_PostDeleteOperations_NoPanic(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, _, seg := openSnapshotTestTSDB(t, dir)
+       defer func() { _ = tsdb.Close() }()
+
+       // Real on-disk data so inspect/snapshot do meaningful work before 
deletion.
+       require.NoError(t, seg.IndexDB().Insert(buildTestSeriesDocs(t, 8)))
+
+       snapBase := t.TempDir()
+       const (
+               readers      = 4
+               snapshotters = 2
+               cycles       = 200
+       )
+       var (
+               wg      sync.WaitGroup
+               ready   sync.WaitGroup
+               panics  int64
+               snapSeq int64
+       )
+       ready.Add(readers + snapshotters)
+       // Each worker runs a fixed number of cycles (no wall-clock timing) and 
signals
+       // `ready` after its first iteration. The delete fires only once every 
worker
+       // has run at least once, so it is guaranteed to race operations that 
are still
+       // looping -- deterministically, on any CPU, with no sleep a slow 
machine could
+       // under- or over-shoot.
+       guard := func(fn func()) {
+               defer wg.Done()
+               defer func() {
+                       if r := recover(); r != nil {
+                               atomic.AddInt64(&panics, 1)
+                       }
+               }()
+               for c := 0; c < cycles; c++ {
+                       fn()
+                       if c == 0 {
+                               ready.Done()
+                       }
+               }
+       }
+
+       // Inspect readers.
+       for i := 0; i < readers; i++ {
+               wg.Add(1)
+               go guard(func() {
+                       _, _ = seg.SeriesIndexStats()
+                       _, _ = CollectClosedShardInfo(seg.Location())
+               })
+       }
+       // Backup writers, each to its own destination.
+       for i := 0; i < snapshotters; i++ {
+               wg.Add(1)
+               go guard(func() {
+                       dst := filepath.Join(snapBase, 
"snap-"+strconv.FormatInt(atomic.AddInt64(&snapSeq, 1), 10))
+                       _, _ = seg.snapshotInto(dst)
+               })
+       }
+
+       ready.Wait()
+       seg.delete() // races the operations that are still looping
+       wg.Wait()
+
+       require.Equal(t, int64(0), atomic.LoadInt64(&panics),
+               "inspect/snapshot operations on a deleted segment must not 
panic")
+       require.NoDirExists(t, seg.location,
+               "once no operation holds a reference, the deleted segment's 
directory is gone")
+       require.Nil(t, seg.index, "the deleted segment is closed")
+}
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index 20175520b..640b34eb9 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -140,21 +140,26 @@ func TestSegmentOpenAndReopen(t *testing.T) {
        require.NoError(t, err)
        require.NotNil(t, segment)
 
-       // Verify segment is open
-       assert.Greater(t, segment.refCount, int32(0))
+       // A freshly opened segment is dormant: resources open (index!=nil) 
with no
+       // active reference (refCount==0).
+       assert.NotNil(t, segment.index)
+       assert.Equal(t, int32(0), segment.refCount)
 
-       // Close segment by decrementing reference count
-       initialRefCount := segment.refCount
+       // Acquire an active reference, then release it: DecRef to zero leaves 
the
+       // segment dormant (open), it does NOT close.
+       require.NoError(t, segment.incRef(ctx))
+       assert.Equal(t, int32(1), segment.refCount)
        segment.DecRef()
+       assert.Equal(t, int32(0), segment.refCount)
+       assert.NotNil(t, segment.index)
 
-       // Verify segment is closed (refCount reduced)
-       assert.Equal(t, initialRefCount-1, segment.refCount)
-
-       // Reopen segment
-       segment.incRef(ctx)
-
-       // Verify segment is properly reopened
-       assert.Equal(t, initialRefCount, segment.refCount)
+       // Idle-close it (resources released), then incRef reopens it.
+       segment.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       require.True(t, segment.closeIfIdle(time.Now().UnixNano()))
+       assert.Nil(t, segment.index)
+       require.NoError(t, segment.incRef(ctx))
+       assert.Equal(t, int32(1), segment.refCount)
+       defer segment.DecRef()
 
        // Verify we can still access segment data
        assert.NotNil(t, segment.index)
@@ -234,17 +239,22 @@ func TestSegmentCloseIfIdle(t *testing.T) {
        segment, err := sc.openSegment(ctx, startTime, endTime, segmentPath, 
suffix, sc.groupCache)
        require.NoError(t, err)
 
-       // Force last access time to be in the past
+       // A freshly opened segment is dormant (open, refCount==0).
+       require.NotNil(t, segment.index)
+       require.Equal(t, int32(0), segment.refCount)
+
+       // Force last access time to be in the past so the idle reclaimer 
closes it.
        segment.lastAccessed.Store(time.Now().Add(-time.Minute).UnixNano())
 
-       // Close if idle should succeed
-       segment.DecRef()
+       // closeIfIdle releases the dormant, idle segment's resources.
+       require.True(t, segment.closeIfIdle(time.Now().UnixNano()))
 
        // Verify segment is closed
        assert.Nil(t, segment.index)
 
        // Test reopening the segment
-       segment.incRef(ctx)
+       require.NoError(t, segment.incRef(ctx))
+       defer segment.DecRef()
 
        // Verify segment is properly reopened
        assert.NotNil(t, segment.index)
@@ -339,10 +349,10 @@ func TestCloseIdleAndSelectSegments(t *testing.T) {
        // Verify we have three segments
        require.Len(t, sc.lst, 3)
 
-       // Make sure all segments have reference counts > 0
-       require.Greater(t, seg1.refCount, int32(0))
-       require.Greater(t, seg2.refCount, int32(0))
-       require.Greater(t, seg3.refCount, int32(0))
+       // All segments start dormant: open (index!=nil) with no active 
reference.
+       require.NotNil(t, seg1.index)
+       require.NotNil(t, seg2.index)
+       require.NotNil(t, seg3.index)
 
        // Force segments 1 and 3 to be idle (setting last accessed time in the 
past)
        seg1.lastAccessed.Store(time.Now().Add(-time.Second).UnixNano())
@@ -590,9 +600,9 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t 
*testing.T) {
                segments = append(segments, segment)
        }
 
-       // Verify all segments are initially open
+       // Verify all segments are initially open (dormant: index!=nil, 
refCount==0)
        for i, seg := range segments {
-               assert.Greater(t, seg.refCount, int32(0), "Segment %d should be 
open", i)
+               assert.NotNil(t, seg.index, "Segment %d should be open", i)
        }
 
        // Set the "lastAccessed" time for some segments to make them idle
@@ -614,12 +624,12 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t 
*testing.T) {
        closedCount := sc.closeIdleSegments()
        assert.Equal(t, 3, closedCount, "Should have closed 3 segments")
 
-       // Verify segments 0, 2, and 4 are closed
-       assert.Equal(t, int32(0), segments[0].refCount, "Segment 0 should be 
closed")
+       // Verify segments 0, 2, and 4 are closed (index released)
+       assert.Nil(t, segments[0].index, "Segment 0 should be closed")
        assert.NotNil(t, segments[1].index, "Segment 1 should remain open")
-       assert.Equal(t, int32(0), segments[2].refCount, "Segment 2 should be 
closed")
+       assert.Nil(t, segments[2].index, "Segment 2 should be closed")
        assert.NotNil(t, segments[3].index, "Segment 3 should remain open")
-       assert.Equal(t, int32(0), segments[4].refCount, "Segment 4 should be 
closed")
+       assert.Nil(t, segments[4].index, "Segment 4 should be closed")
        assert.NotNil(t, segments[5].index, "Segment 5 should remain open")
 
        // Now delete expired segments
@@ -917,10 +927,11 @@ func TestSegment_IndexDB_ReturnsUntypedNilWhenClosed(t 
*testing.T) {
                startTime.Format(dayFormat), sc.groupCache)
        require.NoError(t, err)
 
-       // Idle-close: drop the only reference so performCleanup runs and the
-       // segment ends up in the residual state seen on cold-tier nodes
-       // (refCount=0, s.index=nil, but segment still reachable for reopen).
-       seg.DecRef()
+       // Idle-close: the reclaimer releases the dormant segment's resources 
so it
+       // ends up in the residual state seen on cold-tier nodes (refCount=0,
+       // s.index=nil, but segment still reachable for reopen).
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       require.True(t, seg.closeIfIdle(time.Now().UnixNano()))
        require.Nil(t, seg.index)
        require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
 
@@ -949,18 +960,17 @@ func TestSegment_IndexDB_ReturnsUntypedNilWhenClosed(t 
*testing.T) {
 
 // TestSegment_ConcurrentReopen_RefCountConsistent stress-tests the
 // reference-count contract of segment.incRef under concurrent reopen.
-// Before the fix, segment.initialize() returned without AddInt32 when
+// Before the fix, the reopen slow path returned without AddInt32 when
 // another goroutine had already reopened the segment, so the second caller
 // "owned" a reference it had never accounted for. The first DecRef would
-// then trigger performCleanup while another goroutine was still using the
-// segment.
+// then tear the segment down while another goroutine was still using it.
 //
 // After the fix, every concurrent incRef must add exactly one reference
 // regardless of which path it takes.
 //
 // Reliability note: a single 64-goroutine round can occasionally serialize
 // well enough that only the first caller enters the slow path
-// (initialize) and the rest take the fast path (CAS+1). That serialized
+// (acquire) and the rest take the fast path (CAS+1). That serialized
 // shape would also pass on the buggy code. To make the test deterministic
 // across machines, the experiment is repeated for `rounds` cycles, each
 // starting from the residual state. The bug becomes effectively
@@ -1010,8 +1020,9 @@ func TestSegment_ConcurrentReopen_RefCountConsistent(t 
*testing.T) {
                startTime.Format(dayFormat), sc.groupCache)
        require.NoError(t, err)
 
-       // Bring the segment into the cold-tier residual state.
-       seg.DecRef()
+       // Bring the segment into the cold-tier residual (closed) state.
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       require.True(t, seg.closeIfIdle(time.Now().UnixNano()))
        require.Nil(t, seg.index)
        require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
 
@@ -1048,16 +1059,21 @@ func TestSegment_ConcurrentReopen_RefCountConsistent(t 
*testing.T) {
                        "round %d: every concurrent incRef must add exactly one 
reference", r)
                require.NotNil(t, seg.index, "round %d: segment.index must be 
reopened", r)
 
-               // Drain references; the last DecRef must trigger performCleanup
-               // and bring the segment back to the residual state for the next
-               // round.
+               // Drain references. DecRef to zero leaves the segment dormant 
(open);
+               // the idle reclaimer then closes it, restoring the residual 
state for
+               // the next round.
                for i := 0; i < N; i++ {
                        seg.DecRef()
                }
                require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount),
                        "round %d", r)
+               require.NotNil(t, seg.index,
+                       "round %d: draining to zero leaves the segment dormant 
(open)", r)
+               seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+               require.True(t, seg.closeIfIdle(time.Now().UnixNano()),
+                       "round %d: the reclaimer must close the dormant 
segment", r)
                require.Nil(t, seg.index,
-                       "round %d: segment.index must be cleared by 
performCleanup", r)
+                       "round %d: segment.index must be cleared by the 
reclaimer", r)
        }
 }
 
@@ -1065,14 +1081,15 @@ func TestSegment_ConcurrentReopen_RefCountConsistent(t 
*testing.T) {
 // reopen pattern under heavy concurrency: many goroutines repeatedly call
 // selectSegments, exercise IndexDB().Stats() on each returned segment, and
 // release with DecRef. With idle-closed segments as the starting state,
-// each iteration triggers initialize() to reopen the segment and the final
-// DecRef triggers performCleanup() to close it again, so closes and
-// reopens are interleaved across goroutines.
+// each iteration triggers acquire()/initialize() to reopen the segment;
+// DecRef leaves it dormant (open, refCount==0) rather than closing it, so
+// reopens and dormant releases interleave across goroutines.
 //
 // Verifies (run with -race):
 //   - no goroutine panics (covers both the typed-nil and refcount fixes)
 //   - no data race is detected on the segment lifecycle
-//   - all segments end up cleanly closed (refCount == 0, index == nil)
+//   - no segment leaks an active reference (refCount == 0 at the end)
+//   - every dormant segment is then reclaimable to closed via the idle 
reclaimer
 func TestSegment_ConcurrentReopenAndClose_NoPanic(t *testing.T) {
        tempDir, cleanup := setupTestEnvironment(t)
        defer cleanup()
@@ -1123,8 +1140,9 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t 
*testing.T) {
                sc.lst = append(sc.lst, seg)
                sc.sortLst()
                sc.Unlock()
-               // Drop the open reference so each segment starts in the 
residual state.
-               seg.DecRef()
+               // Force each segment into the residual (closed) state to start.
+               seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+               require.True(t, seg.closeIfIdle(time.Now().UnixNano()))
                require.Nil(t, seg.index)
        }
 
@@ -1174,10 +1192,19 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t 
*testing.T) {
        sc.RLock()
        final := append([]*segment[mockTSTable, mockTSTableOpener]{}, sc.lst...)
        sc.RUnlock()
+       // No goroutine may leak an active reference.
        for _, s := range final {
                require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount),
                        "segment %s leaked references", s.suffix)
-               require.Nil(t, s.index, "segment %s should be cleaned up", 
s.suffix)
+       }
+       // After the churn the segments are dormant (open, refCount==0); the 
idle
+       // reclaimer must be able to close every one of them cleanly.
+       for _, s := range final {
+               s.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       }
+       sc.closeIdleSegments()
+       for _, s := range final {
+               require.Nil(t, s.index, "segment %s should be reclaimable to 
closed", s.suffix)
        }
 }
 
diff --git a/banyand/internal/storage/snapshot_closed_segment_test.go 
b/banyand/internal/storage/snapshot_closed_segment_test.go
index 11b6be406..5c4719dad 100644
--- a/banyand/internal/storage/snapshot_closed_segment_test.go
+++ b/banyand/internal/storage/snapshot_closed_segment_test.go
@@ -88,6 +88,10 @@ func openSnapshotTSDB(t *testing.T, dir string, ttlDays int) 
(TSDB[*MockTSTable,
                TSTableCreator:     MockTSTableCreator,
                SegmentIdleTimeout: time.Hour,
        }
+       // Pin the controller's clock to snapshotTestBase so retention's 
deadline is
+       // computed from the same clock the segments are created against; 
otherwise the
+       // fixed-2024 segments would look retention-expired against the real 
wall clock
+       // and SelectSegments would filter them out.
        mc := timestamp.NewMockClock()
        mc.Set(snapshotTestBase)
        ctx := timestamp.SetClock(context.Background(), mc)
@@ -103,7 +107,7 @@ func openSnapshotTestTSDB(t *testing.T, dir string) 
(TSDB[*MockTSTable, any], *s
        tsdb, sc := openSnapshotTSDB(t, dir, 3)
        seg, err := tsdb.CreateSegmentIfNotExist(snapshotTestBase)
        require.NoError(t, err)
-       seg.DecRef() // leave it at its open baseline
+       seg.DecRef() // release the create ref, leaving it open but dormant 
(refCount==0)
        require.Len(t, sc.lst, 1)
        return tsdb, sc, sc.lst[0]
 }
@@ -195,7 +199,7 @@ func TestSeriesIndexStats_ClosedSegmentIsNotReopened(t 
*testing.T) {
        defer func() { require.NoError(t, tsdb.Close()) }()
 
        // Write a known number of series into the open segment's index, then 
drive
-       // it idle-closed (performCleanup flushes the index to disk on close).
+       // it idle-closed (closeIfIdle flushes the index to disk on close).
        const seriesCount = 10
        require.NoError(t, seg.IndexDB().Insert(buildTestSeriesDocs(t, 
seriesCount)))
 
@@ -308,7 +312,7 @@ func snapshotDay(i int) time.Time {
 
 // createSegmentWithSeries creates (via the controller) the segment containing
 // ts, inserts n series documents into its series index, and returns the
-// internal segment left at its open baseline (the create reference is 
released).
+// internal segment left open but dormant (the create reference is released).
 func createSegmentWithSeries(t *testing.T, tsdb TSDB[*MockTSTable, any], ts 
time.Time, n int) *segment[*MockTSTable, any] {
        t.Helper()
        s, err := tsdb.CreateSegmentIfNotExist(ts)
@@ -529,8 +533,9 @@ func idleClose(t *testing.T, sc 
*segmentController[*MockTSTable, any], seg *segm
 }
 
 // TestSelectSegments_ReopenTrue_ReopensClosedSegment: reopenClosed=true on a
-// CLOSED segment reopens it (incRef -> initialize), restores refCount to 1, 
and
-// refreshes lastAccessed (a real read keeps the segment hot).
+// CLOSED segment reopens it (incRef -> acquire), takes refCount to 1, and
+// refreshes lastAccessed (a real read keeps the segment hot). DecRef to zero
+// then leaves it dormant (open), not closed.
 func TestSelectSegments_ReopenTrue_ReopensClosedSegment(t *testing.T) {
        dir := snapshotTestDir(t)
        tsdb, sc := newEmptySnapshotTSDB(t, dir)
@@ -544,36 +549,38 @@ func TestSelectSegments_ReopenTrue_ReopensClosedSegment(t 
*testing.T) {
        require.NoError(t, err)
        require.Len(t, got, 1)
        require.NotNil(t, seg.index, "reopenClosed=true must reopen a closed 
segment")
-       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "reopen 
reinitializes refCount to 1")
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "acquire 
takes refCount to 1")
        require.Greater(t, seg.lastAccessed.Load(), closedAccessed, "a real 
read refreshes lastAccessed")
 
        got[0].DecRef()
-       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "DecRef 
returns the reopened segment to closed")
-       require.Nil(t, seg.index)
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+       require.NotNil(t, seg.index, "DecRef to zero leaves the reopened 
segment dormant (open), not closed")
 }
 
-// TestSelectSegments_ReopenTrue_OpenSegmentBumpsAndRefreshes: 
reopenClosed=true
-// on an OPEN segment pins it (+1) and refreshes lastAccessed without 
re-opening.
-func TestSelectSegments_ReopenTrue_OpenSegmentBumpsAndRefreshes(t *testing.T) {
+// TestSelectSegments_ReopenTrue_DormantSegmentAcquiredAndRefreshed:
+// reopenClosed=true on a DORMANT open segment (refCount==0, index!=nil) 
acquires
+// it (0->1) and refreshes lastAccessed without re-opening; DecRef returns it 
to
+// dormant.
+func TestSelectSegments_ReopenTrue_DormantSegmentAcquiredAndRefreshed(t 
*testing.T) {
        dir := snapshotTestDir(t)
        tsdb, sc := newEmptySnapshotTSDB(t, dir)
        defer func() { require.NoError(t, tsdb.Close()) }()
 
        seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
        require.NotNil(t, seg.index)
-       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "open 
baseline refCount is 1")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "dormant 
baseline: open, refCount 0")
        seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
        before := seg.lastAccessed.Load()
 
        got, err := sc.selectSegments(allTimeRange(), true)
        require.NoError(t, err)
        require.Len(t, got, 1)
-       require.NotNil(t, seg.index, "open segment stays open")
-       require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "incRef 
bumps the open segment")
+       require.NotNil(t, seg.index, "dormant open segment stays open (no 
reopen needed)")
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "acquire 
takes the dormant segment 0->1")
        require.Greater(t, seg.lastAccessed.Load(), before, "a real read 
refreshes lastAccessed")
 
        got[0].DecRef()
-       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
 }
 
 // TestSelectSegments_ReopenFalse_ClosedSegmentNotReopened: reopenClosed=false 
on
@@ -600,28 +607,31 @@ func 
TestSelectSegments_ReopenFalse_ClosedSegmentNotReopened(t *testing.T) {
        require.Nil(t, seg.index)
 }
 
-// TestSelectSegments_ReopenFalse_OpenSegmentPinnedNoRefresh: 
reopenClosed=false
-// on an OPEN segment pins it (+1) but does NOT refresh lastAccessed, so the
-// segment stays eligible for the next idle-close tick.
-func TestSelectSegments_ReopenFalse_OpenSegmentPinnedNoRefresh(t *testing.T) {
+// TestSelectSegments_ReopenFalse_DormantSegmentNotPinned: reopenClosed=false 
on a
+// DORMANT open segment (refCount==0) returns it WITHOUT pinning. By design the
+// stats path never bumps a refCount==0 segment (that would race the idle
+// reclaimer); stats are best-effort. The segment stays open and its idle timer
+// is not refreshed.
+func TestSelectSegments_ReopenFalse_DormantSegmentNotPinned(t *testing.T) {
        dir := snapshotTestDir(t)
        tsdb, sc := newEmptySnapshotTSDB(t, dir)
        defer func() { require.NoError(t, tsdb.Close()) }()
 
        seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
-       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+       require.NotNil(t, seg.index)
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "dormant: 
open, refCount 0")
        seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
        before := seg.lastAccessed.Load()
 
        got, err := sc.selectSegments(allTimeRange(), false)
        require.NoError(t, err)
        require.Len(t, got, 1)
-       require.NotNil(t, seg.index, "open segment stays open")
-       require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "open 
segment is pinned (+1)")
+       require.NotNil(t, seg.index, "dormant open segment stays open")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "a dormant 
segment is not pinned by the stats path")
        require.Equal(t, before, seg.lastAccessed.Load(), "a stats peek must 
not refresh lastAccessed")
 
-       got[0].DecRef()
-       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+       got[0].DecRef() // no-op
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
 }
 
 // TestSelectSegments_ReopenFalse_InUseSegmentPinned: reopenClosed=false on a
@@ -633,17 +643,17 @@ func TestSelectSegments_ReopenFalse_InUseSegmentPinned(t 
*testing.T) {
        defer func() { require.NoError(t, tsdb.Close()) }()
 
        seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
-       require.NoError(t, seg.incRef(context.Background())) // a concurrent 
reader holds a ref
-       require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "in-use: 
baseline + one active reader")
+       require.NoError(t, seg.incRef(context.Background())) // an active 
reader holds a ref
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "one active 
reader")
 
        got, err := sc.selectSegments(allTimeRange(), false)
        require.NoError(t, err)
        require.Len(t, got, 1)
-       require.Equal(t, int32(3), atomic.LoadInt32(&seg.refCount), "in-use 
open segment is pinned (+1)")
+       require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "an in-use 
(refCount>0) segment is pinned (+1)")
 
        got[0].DecRef() // release the selectSegments pin
-       seg.DecRef()    // release the simulated active reader
-       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "back to 
open baseline")
+       seg.DecRef()    // release the active reader
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "back to 
dormant")
 }
 
 // TestSelectSegments_TimeRangeFilters: only segments overlapping the requested
@@ -699,3 +709,56 @@ func TestSelectSegments_PublicWrapper(t *testing.T) {
        require.NoError(t, err)
        require.Nil(t, got, "a closed database returns no segments")
 }
+
+// TestSelectSegments_FiltersRetentionExpiredSegment verifies that
+// SelectSegments(_, true) excludes a segment whose whole time range has passed
+// the retention deadline, and that the deadline tracks the controller's
+// injectable clock: advancing the mock clock past the TTL expires the segment
+// even though it is still on disk (retention removes it only on its next run).
+func TestSelectSegments_FiltersRetentionExpiredSegment(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, sc := openSnapshotTSDB(t, dir, 3) // TTL = 3 days
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotTestBase, 0)
+       require.NotNil(t, seg.index)
+
+       // Clock at snapshotTestBase: the segment is within the TTL and is 
returned.
+       got, err := tsdb.SelectSegments(allTimeRange(), true)
+       require.NoError(t, err)
+       require.Len(t, got, 1, "a segment within the TTL is returned")
+       for _, g := range got {
+               g.DecRef()
+       }
+
+       // Advance the controller clock past the TTL so the segment's whole 
range is
+       // behind the retention deadline; SelectSegments must now filter it out.
+       sc.clock.(timestamp.MockClock).Add(10 * 24 * time.Hour)
+       got, err = tsdb.SelectSegments(allTimeRange(), true)
+       require.NoError(t, err)
+       require.Empty(t, got, "a fully retention-expired segment is excluded 
from query results")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "the 
filtered segment leaks no reference")
+}
+
+// TestGetExpiredSegmentsTimeRange_TracksInjectableClock verifies that the
+// expired-segment time range is computed against the controller's injectable
+// clock: a segment within the TTL reports nothing, and advancing the mock 
clock
+// past the TTL makes it expired. (Guards against reverting to wall-clock
+// time.Now(), which would mis-expire mock-clock-dated segments.)
+func TestGetExpiredSegmentsTimeRange_TracksInjectableClock(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, sc := openSnapshotTSDB(t, dir, 3) // TTL = 3 days
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotTestBase, 0)
+
+       // Clock at snapshotTestBase: the segment is within the TTL, nothing 
expired.
+       tr := sc.getExpiredSegmentsTimeRange()
+       require.True(t, tr.Start.IsZero(), "no segment is expired while the 
clock is within the TTL")
+
+       // Advance the clock past the TTL: the segment is now expired and 
reported.
+       sc.clock.(timestamp.MockClock).Add(10 * 24 * time.Hour)
+       tr = sc.getExpiredSegmentsTimeRange()
+       require.Equal(t, seg.Start, tr.Start, "the expired range starts at the 
now-expired segment")
+       require.Equal(t, seg.End, tr.End, "the expired range ends at the 
now-expired segment")
+}
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 9dff036f1..ae3275d04 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -405,21 +405,17 @@ func (d *database[T, O]) collect() {
                return
        }
        d.metrics.lastTickTime.Set(float64(d.latestTickTime.Load()))
-       refCount := int32(0)
-       ss, _ := d.segmentController.segments(context.Background(), false)
-       for _, s := range ss {
-               if atomic.LoadInt32(&s.refCount) <= 0 {
-                       continue
-               }
-               tables, _ := s.Tables()
-               for _, t := range tables {
-                       t.Collect(d.segmentController.metrics)
+       // Collect metrics for every open segment. Under the dormant-refcount 
model
+       // an open segment usually has refCount==0, so we cannot rely on a 
reference
+       // bump to pin it: collectOpenMetrics gathers under the segment's read 
lock,
+       // which a concurrent reclaim (closeIfIdle takes the write lock) cannot 
race.
+       openSegments := int32(0)
+       for _, s := range d.segmentController.copySegments() {
+               if s.collectOpenMetrics(d.segmentController.metrics) {
+                       openSegments++
                }
-               s.collectMetrics()
-               s.DecRef()
-               refCount += atomic.LoadInt32(&s.refCount)
        }
-       d.totalSegRefs.Set(float64(refCount))
+       d.totalSegRefs.Set(float64(openSegments))
        if d.metrics.schedulerMetrics == nil {
                return
        }
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
index e679a6a74..e2ac247f8 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -707,11 +707,11 @@ func TestCollectWithPartialClosedSegments(t *testing.T) {
        ss, _ = sc.segments(context.Background(), false)
        for _, s := range ss {
                if s.Start.Equal(segmentDates[0]) || 
s.Start.Equal(segmentDates[2]) {
-                       require.Equal(t, int32(0), s.refCount, "Segment should 
be closed")
+                       require.Nil(t, s.index, "Segment should be closed")
                } else {
-                       require.Greater(t, s.refCount, int32(0), "Segment 
should be open")
+                       require.NotNil(t, s.index, "Segment should be open")
                }
-               s.DecRef() // Release reference
+               s.DecRef() // no-op for dormant/closed segments
        }
 
        // Call the collect method with mixed open/closed segments
diff --git a/banyand/measure/write_data_segmentref_test.go 
b/banyand/measure/write_data_segmentref_test.go
index b71f5e742..c1a36f7eb 100644
--- a/banyand/measure/write_data_segmentref_test.go
+++ b/banyand/measure/write_data_segmentref_test.go
@@ -59,8 +59,7 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
 
        seg, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
-       seg.DecRef()
-       seg.DecRef() // refCount -> 0 (idle-closed)
+       seg.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
        baselineOpens := openShardCount.Load()
 
        segA, err := db.CreateSegmentIfNotExist(segTime)
@@ -106,8 +105,8 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
        defer segC.DecRef()
        _, err = segC.CreateTSTableIfNotExist(common.ShardID(0))
        require.NoError(t, err)
-       require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
-               "REGRESSION: Close did not call segment.DecRef()")
+       require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+               "after Close the dormant segment and its already-open shard are 
reused, no new shard")
 }
 
 // TestSyncChunkCallback_CreatePartHandler_StoresSegment drives the real
@@ -129,8 +128,7 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
 
        warmup, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
-       warmup.DecRef()
-       warmup.DecRef()
+       warmup.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
        baselineOpens := openShardCount.Load()
 
        const groupName = "test-group"
@@ -179,8 +177,8 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
        defer segProbe.DecRef()
        _, err = segProbe.CreateTSTableIfNotExist(common.ShardID(0))
        require.NoError(t, err)
-       require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
-               "REGRESSION: Close did not call segment.DecRef()")
+       require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+               "after Close the dormant segment and its already-open shard are 
reused, no new shard")
 }
 
 func openTestTSDBForRefTest(t *testing.T, tmpPath string, openShardCount 
*atomic.Int32) storage.TSDB[*tsTable, option] {
@@ -340,8 +338,7 @@ func TestSegmentCreateTS_ConsistencyAcrossPaths(t 
*testing.T) {
        historicalStart := ir.Standard(historicalRaw)
        histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
        require.NoError(t, err)
-       histSeg.DecRef()
-       histSeg.DecRef() // refCount->0, idle-closed
+       histSeg.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
 
        // New raw datapoint with an off-grid (06:00 inside a 5-day bucket) ts.
        // liaison-side sidx pre-standardizes it; data-node-side part chunk-sync
diff --git a/banyand/stream/write_data_segmentref_test.go 
b/banyand/stream/write_data_segmentref_test.go
index 509d333b9..7b6863969 100644
--- a/banyand/stream/write_data_segmentref_test.go
+++ b/banyand/stream/write_data_segmentref_test.go
@@ -59,8 +59,7 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
 
        seg, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
-       seg.DecRef()
-       seg.DecRef() // refCount -> 0 (idle-closed)
+       seg.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
        baselineOpens := openShardCount.Load()
 
        segA, err := db.CreateSegmentIfNotExist(segTime)
@@ -106,8 +105,8 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
        defer segC.DecRef()
        _, err = segC.CreateTSTableIfNotExist(common.ShardID(0))
        require.NoError(t, err)
-       require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
-               "REGRESSION: Close did not call segment.DecRef()")
+       require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+               "after Close the dormant segment and its already-open shard are 
reused, no new shard")
 }
 
 // TestSyncChunkCallback_CreatePartHandler_StoresSegment drives the real
@@ -132,8 +131,7 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
 
        warmup, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
-       warmup.DecRef()
-       warmup.DecRef()
+       warmup.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
        baselineOpens := openShardCount.Load()
 
        const groupName = "test-group"
@@ -180,8 +178,8 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
        defer segProbe.DecRef()
        _, err = segProbe.CreateTSTableIfNotExist(common.ShardID(0))
        require.NoError(t, err)
-       require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
-               "REGRESSION: Close did not call segment.DecRef()")
+       require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+               "after Close the dormant segment and its already-open shard are 
reused, no new shard")
 }
 
 func openTestTSDBForRefTest(t *testing.T, tmpPath string, openShardCount 
*atomic.Int32) storage.TSDB[*tsTable, option] {
@@ -315,8 +313,7 @@ func TestSegmentCreateTS_ConsistencyAcrossPaths(t 
*testing.T) {
        historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0, 
time.Local))
        histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
        require.NoError(t, err)
-       histSeg.DecRef()
-       histSeg.DecRef()
+       histSeg.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
 
        rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
        wantNewStart := ir.Standard(rawTS)
diff --git a/banyand/trace/write_data_segmentref_test.go 
b/banyand/trace/write_data_segmentref_test.go
index d170bb3cd..6770b9c13 100644
--- a/banyand/trace/write_data_segmentref_test.go
+++ b/banyand/trace/write_data_segmentref_test.go
@@ -74,12 +74,12 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
 
        segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
 
-       // Drive the segment into the cold-0 "idle-closed" state: segment object
-       // still exists in sc.lst but refCount=0 and shards have been released.
+       // Drop the segment to refCount 0. Under the dormant-refcount model it 
stays
+       // open (loaded), so re-acquiring it does NOT reload shards: the open 
count
+       // stays flat across the sibling sync sessions below.
        seg, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
-       seg.DecRef() // normal caller release    -> refCount = 1
-       seg.DecRef() // simulate idle close      -> refCount = 0
+       seg.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
        baselineOpens := openShardCount.Load()
 
        // ---------- Part A: simulate syncChunkCallback.CreatePartHandler 
(fix) ----------
@@ -104,8 +104,8 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
 
        opensAfterA := openShardCount.Load()
        require.Equal(t, int32(1), opensAfterA-baselineOpens,
-               "Part A's CreateSegmentIfNotExist should re-initialize exactly 
once "+
-                       "(segment was idle-closed, so refCount=0 -> initialize 
branch)")
+               "Part A creates shard 0 for the first time (the dormant segment 
is reused "+
+                       "without a reload, but its shard table is opened on 
demand)")
 
        // ---------- Part B: another sync session starts while A is still in 
flight ----------
        // With the fix, segA keeps refCount>=1, so Part B's 
CreateSegmentIfNotExist
@@ -146,11 +146,9 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
        require.Nil(t, partCtxB.segment,
                "REGRESSION: Close must clear syncPartContext.segment 
(`s.segment = nil`)")
 
-       // ---------- Verify Close actually DecRef'd (not just cleared the 
field) ----------
-       // After both Closes, refCount should be 0. The next 
CreateSegmentIfNotExist
-       // must therefore re-initialize and invoke TSTableCreator. If Close 
forgot
-       // to call s.segment.DecRef(), refCount would still be >=1 and this
-       // assertion would fail.
+       // ---------- After Close, the dormant segment is reused without reload 
----------
+       // Both Closes drop refCount back to 0; the segment stays open 
(dormant), so
+       // the next CreateSegmentIfNotExist re-acquires it without reloading 
shards.
        segC, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
        defer segC.DecRef()
@@ -158,10 +156,8 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
        require.NoError(t, err)
 
        opensAfterC := openShardCount.Load()
-       require.Equal(t, int32(1), opensAfterC-opensAfterB,
-               "REGRESSION: after closing both partCtx, the next 
CreateSegmentIfNotExist did "+
-                       "not re-initialize. This means Close did not call 
segment.DecRef(). "+
-                       "Check `s.segment.DecRef()` is still in 
syncPartContext.Close.")
+       require.Equal(t, int32(0), opensAfterC-opensAfterB,
+               "the dormant segment is reused without reload after Close")
 }
 
 func openTestTSDBForRefTest(t *testing.T, tmpPath string, openShardCount 
*atomic.Int32) storage.TSDB[*tsTable, option] {
@@ -229,13 +225,12 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
 
        segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local)
 
-       // Drive the segment into the idle-closed state (refCount=0) so that the
-       // probe at the end of this test reliably exercises the `initialize`
-       // branch of incRef if and only if Close actually DecRef'd.
+       // Drop the segment to refCount 0. Under the dormant-refcount model it 
stays
+       // open (loaded), so subsequent CreateSegmentIfNotExist calls 
re-acquire it
+       // without reloading shards.
        warmup, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
-       warmup.DecRef()
-       warmup.DecRef() // refCount -> 0
+       warmup.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
        baselineOpens := openShardCount.Load()
 
        const groupName = "test-group"
@@ -265,11 +260,11 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
                        "partCtx struct literal and that `defer 
segment.DecRef()` has not been "+
                        "re-introduced at the top of CreatePartHandler.")
 
-       // At this point CreatePartHandler has driven refCount 0 -> 1 via 
initialize
-       // (loadShards ran -> TSTableCreator was called once).
+       // CreatePartHandler acquired the dormant segment (refCount 0 -> 1) and
+       // opened shard 0 on demand for the first time.
        afterCreate := openShardCount.Load()
        require.Equal(t, int32(1), afterCreate-baselineOpens,
-               "CreatePartHandler should have re-initialized the idle-closed 
segment once")
+               "CreatePartHandler opens shard 0 for the first time on the 
dormant segment")
 
        // CRITICAL mid-stream probe (between CreatePartHandler and Close).
        // Under the fix, partCtx holds segment.refCount >= 1, so a concurrent
@@ -307,10 +302,8 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
        _, err = segProbe.CreateTSTableIfNotExist(common.ShardID(0))
        require.NoError(t, err)
 
-       require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
-               "REGRESSION: after partCtx.Close, the next 
CreateSegmentIfNotExist did not "+
-                       "re-initialize. This means Close did not call 
s.segment.DecRef(). "+
-                       "Check that the Close method still contains 
`s.segment.DecRef()`.")
+       require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+               "after Close the dormant segment is reused without reload")
 }
 
 // newTestSchemaRepo builds a minimal *schemaRepo whose loadTSDB returns the
@@ -421,8 +414,7 @@ func TestSegmentCreateTS_ConsistencyAcrossPaths(t 
*testing.T) {
        historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0, 
time.Local))
        histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
        require.NoError(t, err)
-       histSeg.DecRef()
-       histSeg.DecRef()
+       histSeg.DecRef() // the single create reference; refCount = 0 (dormant: 
open, no active reference)
 
        rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
        wantNewStart := ir.Standard(rawTS)

Reply via email to