This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 1b245e633 Enhance segment lifecycle (#1151)
1b245e633 is described below
commit 1b245e633c624aa7ffa8afcfa355e90c88bd235c
Author: mrproliu <[email protected]>
AuthorDate: Wed Jun 3 13:59:28 2026 +0800
Enhance segment lifecycle (#1151)
* Enhance segment lifecycle
---
CHANGES.md | 1 +
banyand/internal/storage/index.go | 6 +-
banyand/internal/storage/rotation.go | 2 +-
banyand/internal/storage/segment.go | 317 ++++-----
.../internal/storage/segment_idle_reclaim_test.go | 708 +++++++++++++++++++++
.../storage/segment_lifecycle_integration_test.go | 218 +++++++
banyand/internal/storage/segment_test.go | 121 ++--
.../storage/snapshot_closed_segment_test.go | 121 +++-
banyand/internal/storage/tsdb.go | 22 +-
banyand/internal/storage/tsdb_test.go | 6 +-
banyand/measure/write_data_segmentref_test.go | 17 +-
banyand/stream/write_data_segmentref_test.go | 17 +-
banyand/trace/write_data_segmentref_test.go | 50 +-
13 files changed, 1316 insertions(+), 290 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ccb1a0ed5..49262221e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,7 @@ Release Notes.
- Support displaying a measure's indexed tags in the dump tool, resolved per
part so peak memory is bounded by the part rather than a segment-wide series
map.
- Snapshot/backup and data inspection no longer reopen idle-closed segments,
avoiding cold-segment nil-index panics and index lock-file churn.
- Add opt-in vectorized measure query tracing over raw-frame distributed
queries, including a trace envelope and fixed trace-label vocabulary.
+- Enhance segment lifecycle: `refCount` now counts only active users,
decoupled from "open" (`index != nil`), adding a "dormant" state (open,
`refCount == 0`). A `DecRef` to zero no longer closes a segment; idle reclaim
and retention delete act only at `refCount == 0`, so an in-flight
snapshot/inspect is no longer torn down mid-operation (fixing the cold-node
nil-index panic and bluge lock churn) while idle segments still release their
bluge writers.
### Bug Fixes
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index 3ff0ae41c..5b9f17737 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -37,9 +37,9 @@ import (
const seriesIndexDirName = "sidx"
func (s *segment[T, O]) IndexDB() IndexDB {
- // Snapshot s.index into a local: the field can be flipped to nil by
- // performCleanup at any time, so a second read could see a different
- // value. Returning the local also avoids boxing a nil *seriesIndex
+ // Snapshot s.index into a local: the field can be flipped to nil by a
+ // concurrent reclaim (closeIfIdle / performDelete) at any time, so a
second
+ // read could see a different value. Returning the local also avoids
boxing a nil *seriesIndex
// into a typed-nil IndexDB interface, which would defeat the
// `if indexDB == nil` guard on the caller side.
idx := s.index
diff --git a/banyand/internal/storage/rotation.go
b/banyand/internal/storage/rotation.go
index 2b680ddbb..332e1880a 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -107,7 +107,7 @@ func (d *database[T, O]) startRotationTask() error {
}()
for i := range ss {
if ss[i].End.UnixNano()
< ts {
-
ss[i].index.store.Reset()
+
ss[i].resetIndex()
}
}
latest := ss[len(ss)-1]
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 7872cef22..0c941e0c8 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -113,6 +113,11 @@ func (sc *segmentController[T, O]) openSegment(ctx
context.Context, startTime, e
}
s.l = logger.Fetch(ctx, s.String())
s.lastAccessed.Store(time.Now().UnixNano())
+ // New segments start dormant: resources open (index!=nil) but
refCount==0,
+ // so an idle, unused segment is reclaimable without any caller
releasing a
+ // baseline reference.
+ s.mu.Lock()
+ defer s.mu.Unlock()
return s, s.initialize(ctx)
}
@@ -159,61 +164,44 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T,
shardIDs []common.ShardID,
return tt, shardIDs, cc
}
-// incRef acquires one reference on the segment, opening it via initialize
-// if it is currently closed (refCount<=0). On return, the caller is
-// guaranteed that the segment is alive (s.index != nil, shards loaded) and
-// will stay alive until the matching DecRef.
+// incRef acquires one active reference on the segment, opening its resources
+// (series index + shards) via acquire if it is currently closed. On return the
+// segment is guaranteed alive (s.index != nil) until the matching DecRef.
//
-// incRef intentionally does NOT touch lastAccessed: the idle-segment
-// reclaimer (closeIdleSegments) decides eligibility from that field, and
-// the rotation-tick scan iterates every segment via incRef on each tick.
-// Refreshing lastAccessed here would defeat the reclaimer for any segment
-// outside the active write window. Callers that represent a real read or
-// write touch must bump lastAccessed explicitly (see selectSegments and
+// refCount counts only active users and is orthogonal to whether the segment
is
+// open (index != nil): a segment with refCount==0 but index!=nil is "dormant"
+// -- open and reusable, yet eligible for the idle reclaimer. incRef therefore
+// does NOT touch lastAccessed: the reclaimer (closeIdleSegments) decides
+// eligibility from that field and the rotation-tick scan iterates every
segment
+// via incRef on each tick, so refreshing it here would defeat the reclaimer.
+// Real read/write touches bump lastAccessed explicitly (see selectSegments and
// createSegment); housekeeping iterators must not.
//
-// The CAS loop closes a TOCTOU race that the older "Load + AddInt32"
-// shape suffered from: a goroutine could read refCount=1, race against a
-// concurrent DecRef that drives it to 0 and triggers performCleanup
-// (which sets s.index=nil and tears down shards), and then AddInt32 it
-// back to 1. The caller would walk away with refCount=1 -- formally a
-// valid reference -- but pointing at an already-cleaned-up segment, so
-// every subsequent IndexDB() / Tables() call would observe zero state.
-//
-// The CAS variant retries whenever a concurrent DecRef beats us to the
-// counter: if CAS fails, we re-Load and either succeed at the new value
-// or fall through to initialize() to reopen the segment under s.mu.
-// Symmetric to the CAS loop in DecRef, which guarantees the dual: a
-// DecRef that drives refCount to 0 sees no concurrent +1 sneak in
-// before performCleanup runs.
+// The fast path CAS-bumps only while refCount>0, so it can never resurrect a
+// segment the idle reclaimer is closing (closeIfIdle / performDelete act only
+// at refCount==0 under s.mu). When refCount<=0 it falls to the s.mu slow path
+// (acquire), which serializes the reopen and the 0->1 transition against them.
func (s *segment[T, O]) incRef(ctx context.Context) error {
for {
current := atomic.LoadInt32(&s.refCount)
if current <= 0 {
- // Either the segment was never opened or DecRef just
drove
- // refCount to 0; reopen under the mutex.
- return s.initialize(ctx)
+ // Dormant (index!=nil) or closed (index==nil);
reopen/acquire under
+ // the mutex.
+ return s.acquire(ctx)
}
- // CAS so a concurrent DecRef cannot flip refCount to 0 and run
- // performCleanup between our Load and our increment. On failure
- // we re-Load and re-evaluate the branch.
+ // CAS bumps only while refCount>0, so it can never resurrect a
segment
+ // the idle reclaimer is closing (closeIfIdle acts only at
refCount==0).
if atomic.CompareAndSwapInt32(&s.refCount, current, current+1) {
return nil
}
}
}
+// initialize opens the segment's series index and shards if not already
+// open, leaving the segment dormant (index!=nil) WITHOUT changing refCount.
+// Must be called with s.mu held.
func (s *segment[T, O]) initialize(ctx context.Context) error {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- if atomic.LoadInt32(&s.refCount) > 0 {
- // Another goroutine reopened the segment while we waited for
the
- // mutex. Add the +1 our caller (incRef) skipped before entering
- // the slow path; otherwise concurrent reopens would share one
- // refCount and the first DecRef would cleanup while we are
still
- // using it.
- atomic.AddInt32(&s.refCount, 1)
+ if s.index != nil {
return nil
}
@@ -234,85 +222,145 @@ func (s *segment[T, O]) initialize(ctx context.Context)
error {
s.index = nil
return errors.Wrap(errOpenDatabase, errors.WithMessage(err,
"load shards failed").Error())
}
- atomic.StoreInt32(&s.refCount, 1)
s.l.Info().Stringer("seg", s).Msg("segment initialized")
return nil
}
-func (s *segment[T, O]) collectMetrics() {
+// acquire is the slow path of incRef: under s.mu it opens the segment's
+// resources via initialize if closed, then adds the caller's reference (0->1).
+// Holding s.mu serializes the reopen and the 0->1 transition against
+// closeIfIdle / performDelete, which act only at refCount==0 under the same
lock.
+func (s *segment[T, O]) acquire(ctx context.Context) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if atomic.LoadInt32(&s.refCount) > 0 {
+ // Another goroutine acquired the segment while we waited for
the mutex.
+ atomic.AddInt32(&s.refCount, 1)
+ return nil
+ }
+ if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
+ // Flagged for deletion with no active reference: performDelete
has run
+ // (or is about to, blocked on s.mu) and the directory may
already be
+ // gone. Refuse to reopen so we never open resources on a
removed dir.
+ // A still-referenced (refCount>0) flagged segment is handled
by the
+ // fast-bump above, since its directory stays until the last
DecRef.
+ return ErrSegmentClosed
+ }
+ if err := s.initialize(ctx); err != nil {
+ return err
+ }
+ atomic.StoreInt32(&s.refCount, 1)
+ return nil
+}
+
+// resetIndex resets the series-index cache if the segment is open, holding
+// s.mu.RLock so the index pointer read is synchronized against a concurrent
+// close (which clears it under the write lock).
+func (s *segment[T, O]) resetIndex() {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ if s.index != nil {
+ s.index.store.Reset()
+ }
+}
+
+// collectOpenMetrics gathers the segment's shard-table and series-index
metrics
+// if it is open, holding s.mu.RLock so a concurrent reclaim (which takes the
+// write lock) cannot tear the resources down mid-collection. Returns whether
+// the segment was open.
+func (s *segment[T, O]) collectOpenMetrics(shardMetrics Metrics) bool {
s.mu.RLock()
defer s.mu.RUnlock()
if s.index == nil {
- return
+ return false
+ }
+ if sLst := s.sLst.Load(); sLst != nil {
+ for _, sh := range *sLst {
+ sh.table.Collect(shardMetrics)
+ }
}
s.index.store.CollectMetrics(s.index.p.SegLabelValues()...)
+ return true
}
+// DecRef releases one active reference. Reaching refCount==0 does NOT close
the
+// segment -- it becomes dormant (open, reclaimable by the idle reclaimer). The
+// sole exception: a segment flagged for deletion (mustBeDeleted) while held
runs
+// its deferred delete on the last DecRef.
func (s *segment[T, O]) DecRef() {
- shouldCleanup := false
-
- if atomic.LoadInt32(&s.refCount) <= 0 &&
atomic.LoadUint32(&s.mustBeDeleted) != 0 {
- shouldCleanup = true
- } else {
- for {
- current := atomic.LoadInt32(&s.refCount)
- if current <= 0 {
- return
- }
-
- if atomic.CompareAndSwapInt32(&s.refCount, current,
current-1) {
- shouldCleanup = current == 1
- break
+ for {
+ current := atomic.LoadInt32(&s.refCount)
+ if current <= 0 {
+ // Already dormant; nothing to release. Deletion of a
dormant
+ // segment is driven by delete()/performDelete, not
here.
+ return
+ }
+ if atomic.CompareAndSwapInt32(&s.refCount, current, current-1) {
+ if current == 1 && atomic.LoadUint32(&s.mustBeDeleted)
!= 0 {
+ s.performDelete()
}
+ return
}
}
-
- if !shouldCleanup {
- return
- }
-
- s.performCleanup()
}
-func (s *segment[T, O]) performCleanup() {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- if atomic.LoadInt32(&s.refCount) > 0 &&
atomic.LoadUint32(&s.mustBeDeleted) == 0 {
- return
- }
-
- deletePath := ""
- if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
- deletePath = s.location
- }
-
+// closeResourcesLocked closes the segment's series index and shards but keeps
+// its on-disk directory, leaving it closed (index==nil) and reopenable. Must
be
+// called with s.mu held. Idempotent.
+func (s *segment[T, O]) closeResourcesLocked() {
if s.index != nil {
if err := s.index.Close(); err != nil {
s.l.Panic().Err(err).Msg("failed to close the series
index")
}
s.index = nil
}
-
- sLst := s.sLst.Load()
- if sLst != nil {
+ if sLst := s.sLst.Load(); sLst != nil {
for _, shard := range *sLst {
shard.close()
}
- if deletePath == "" {
- s.sLst.Store(&[]*shard[T]{})
- }
+ s.sLst.Store(&[]*shard[T]{})
+ }
+}
+
+// closeIfIdle releases a dormant segment's resources (index writer + shards)
if
+// it is open, unreferenced, not flagged for deletion, and idle past the
+// threshold. The directory is kept so the segment reopens on next access.
+// Returns whether it closed the segment.
+func (s *segment[T, O]) closeIfIdle(idleThreshold int64) bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.index == nil || atomic.LoadInt32(&s.refCount) != 0 ||
+ atomic.LoadUint32(&s.mustBeDeleted) != 0 ||
s.lastAccessed.Load() >= idleThreshold {
+ return false
}
+ s.closeResourcesLocked()
+ return true
+}
- if deletePath != "" {
- s.lfs.MustRMAll(deletePath)
+// performDelete closes the segment's resources and removes its directory. It
+// runs once the last active reference is dropped, or immediately from delete()
+// when the segment is already dormant. Idempotent. Must NOT be called with
s.mu
+// held.
+func (s *segment[T, O]) performDelete() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if atomic.LoadInt32(&s.refCount) > 0 {
+ // A reference was acquired before we ran; the next DecRef to
zero
+ // retries the delete.
+ return
}
+ s.closeResourcesLocked()
+ s.lfs.MustRMAll(s.location)
}
+// delete flags the segment for deletion. If it is dormant (no active
reference)
+// the delete happens now; otherwise it is deferred to the last DecRef.
func (s *segment[T, O]) delete() {
atomic.StoreUint32(&s.mustBeDeleted, 1)
- s.DecRef()
+ if atomic.LoadInt32(&s.refCount) == 0 {
+ s.performDelete()
+ }
}
// Location returns the on-disk directory of the segment.
@@ -327,8 +375,8 @@ func (s *segment[T, O]) Location() string {
func (s *segment[T, O]) SeriesIndexStats() (int64, int64) {
s.mu.RLock()
if s.index != nil {
- // Hold the read lock while reading live stats so a concurrent
- // performCleanup cannot close the index out from under us.
+ // Hold the read lock while reading live stats so a concurrent
reclaim
+ // (closeIfIdle / performDelete) cannot close the index out
from under us.
defer s.mu.RUnlock()
return s.index.Stats()
}
@@ -613,8 +661,17 @@ func (sc *segmentController[T, O]) segments(ctx
context.Context, reopenClosed bo
return nil, err
}
} else {
- if atomic.LoadInt32(&sc.lst[i].refCount) > 0 {
- atomic.AddInt32(&sc.lst[i].refCount, 1)
+ // Pin only if already open (CAS while refCount>0); a
dormant/closed
+ // segment is returned unbumped so the idle reclaimer
is never raced
+ // into resurrecting it, and the caller's DecRef on it
is a no-op.
+ for {
+ current := atomic.LoadInt32(&sc.lst[i].refCount)
+ if current <= 0 {
+ break
+ }
+ if
atomic.CompareAndSwapInt32(&sc.lst[i].refCount, current, current+1) {
+ break
+ }
}
}
r[i] = sc.lst[i]
@@ -634,62 +691,24 @@ func (sc *segmentController[T, O]) copySegments()
[]*segment[T, O] {
return r
}
+// closeIdleSegments releases the resources of every dormant, idle segment
+// (index!=nil, refCount==0, not flagged for deletion, lastAccessed past the
+// idle threshold), keeping their directories so they reopen on next access.
+// Each segment is decided under its own s.mu, where the refCount==0 check is
+// stable against concurrent acquire (which takes the same lock to go 0->1).
func (sc *segmentController[T, O]) closeIdleSegments() int {
- maxIdleTime := sc.idleTimeout
-
- now := time.Now().UnixNano()
- idleThreshold := now - maxIdleTime.Nanoseconds()
-
- // Take our own snapshot rather than going through segments(false): that
- // helper's "Load>0 then Add" bump is non-atomic and can resurrect a
- // segment that DecRef just cleaned up, and its caller has no way to
tell
- // which entries it actually bumped. closeIdleSegments must know,
because
- // it issues an unconditional DecRef per entry to release the bump; if a
- // concurrent selectSegments reopens an entry we didn't bump, that
DecRef
- // would drop the query's only live ref and trigger performCleanup under
- // active use. Iterate sc.lst under RLock and CAS-bump each open
segment;
- // closed segments are recorded as "not bumped" and skipped in the loop.
+ idleThreshold := time.Now().UnixNano() - sc.idleTimeout.Nanoseconds()
sc.RLock()
- segs := make([]*segment[T, O], 0, len(sc.lst))
- bumped := make([]bool, 0, len(sc.lst))
- refAtBump := make([]int32, 0, len(sc.lst))
- for _, s := range sc.lst {
- didBump := false
- var snapRef int32
- for {
- current := atomic.LoadInt32(&s.refCount)
- if current <= 0 {
- break
- }
- if atomic.CompareAndSwapInt32(&s.refCount, current,
current+1) {
- didBump = true
- snapRef = current
- break
- }
- }
- segs = append(segs, s)
- bumped = append(bumped, didBump)
- refAtBump = append(refAtBump, snapRef)
- }
+ segs := make([]*segment[T, O], len(sc.lst))
+ copy(segs, sc.lst)
sc.RUnlock()
closedCount := 0
- for i, seg := range segs {
- if !bumped[i] {
- continue
- }
- // Only close when the snapshot proved refCount==1 (baseline
only):
- // a higher value means other callers hold active references,
and
- // decrementing past our bump would steal one of their refs,
- // potentially triggering performCleanup under active use on a
- // subsequent tick.
- if refAtBump[i] == 1 && seg.lastAccessed.Load() < idleThreshold
{
- seg.DecRef()
+ for _, s := range segs {
+ if s.closeIfIdle(idleThreshold) {
closedCount++
}
- seg.DecRef()
}
-
return closedCount
}
@@ -890,11 +909,11 @@ func (sc *segmentController[T, O]) remove(deadline
time.Time) (hasSegment bool,
// Queries should exclude such fully expired segments to avoid serving
TTL-expired
// data; partially expired segments remain visible until their end passes the
deadline.
func (sc *segmentController[T, O]) getRetentionDeadline() time.Time {
- return time.Now().Local().Add(-sc.getOptions().TTL.estimatedDuration())
+ return
sc.clock.Now().Local().Add(-sc.getOptions().TTL.estimatedDuration())
}
func (sc *segmentController[T, O]) getExpiredSegmentsTimeRange()
*timestamp.TimeRange {
- deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
+ deadline := sc.clock.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
timeRange := ×tamp.TimeRange{
IncludeStart: true,
IncludeEnd: false,
@@ -913,7 +932,7 @@ func (sc *segmentController[T, O])
getExpiredSegmentsTimeRange() *timestamp.Time
}
func (sc *segmentController[T, O]) deleteExpiredSegments(segmentSuffixes
[]string) int64 {
- deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
+ deadline := sc.clock.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
var count int64
ss, _ := sc.segments(context.Background(), false)
sc.l.Info().Str("segment_suffixes", fmt.Sprintf("%s", segmentSuffixes)).
@@ -958,7 +977,7 @@ func (sc *segmentController[T, O]) removeSeg(segID
segmentID) {
}
// peekOldestSegmentEndTime returns the end time of the oldest segment.
-// It returns the zero time and false if no segments exist or all segments
have refCount <= 0.
+// It returns the zero time and false if no segments exist or the oldest
segment is closed.
func (sc *segmentController[T, O]) peekOldestSegmentEndTime() (time.Time,
bool) {
sc.RLock()
defer sc.RUnlock()
@@ -971,8 +990,10 @@ func (sc *segmentController[T, O])
peekOldestSegmentEndTime() (time.Time, bool)
// so the first one is the oldest
oldest := sc.lst[0]
- // Only return segments that are still active (have references > 0)
- if atomic.LoadInt32(&oldest.refCount) > 0 {
+ oldest.mu.RLock()
+ open := oldest.index != nil
+ oldest.mu.RUnlock()
+ if open {
return oldest.End, true
}
@@ -1004,10 +1025,16 @@ func (sc *segmentController[T, O]) removeOldest()
(bool, error) {
func (sc *segmentController[T, O]) close() {
sc.Lock()
defer sc.Unlock()
+ // Full shutdown: release every segment's resources regardless of
refCount
+ // (DecRef no longer closes on reaching zero). A segment flagged for
deletion
+ // also has its directory removed; the rest keep their data on disk.
for _, s := range sc.lst {
- for atomic.LoadInt32(&s.refCount) > 0 {
- s.DecRef()
+ s.mu.Lock()
+ s.closeResourcesLocked()
+ if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
+ s.lfs.MustRMAll(s.location)
}
+ s.mu.Unlock()
}
sc.lst = sc.lst[:0]
if sc.metrics != nil {
diff --git a/banyand/internal/storage/segment_idle_reclaim_test.go
b/banyand/internal/storage/segment_idle_reclaim_test.go
new file mode 100644
index 000000000..10d95e869
--- /dev/null
+++ b/banyand/internal/storage/segment_idle_reclaim_test.go
@@ -0,0 +1,708 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "context"
+ "os"
+ "path/filepath"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// These tests pin the two opposing invariants that any fix for the cold-node
+// snapshot panic / inspect flock must satisfy simultaneously:
+//
+// 1. An actively-held segment (one a caller obtained a live reference to,
+// e.g. an in-flight snapshot or query) must NEVER be reclaimed by the
+// idle reclaimer or torn down by retention. Violating this is the root
+// cause of the production nil-index panic in the snapshot path and the
+// "exclusive lock" flock churn.
+//
+// 2. A genuinely idle segment with no active reference must STILL be
+// reclaimed so its bluge index writer is released -- the leak PR #1128
+// (https://github.com/apache/skywalking-banyandb/pull/1128) fixed by
+// removing the lastAccessed refresh from incRef.
+//
+// The "Keeps" / "Delete" tests reproduce the current bug (they FAIL on the
+// unfixed tree). The "Reclaims" / "DoesNotRefreshLastAccessed" tests guard the
+// PR #1128 behavior (they PASS on the unfixed tree and must keep passing).
+
+// newReclaimTestController builds a segment controller with the given idle
+// timeout, mirroring the setup shared across segment_test.go.
+func newReclaimTestController(
+ t *testing.T, tempDir string, idleTimeout time.Duration,
+) (*segmentController[mockTSTable, mockTSTableOpener], context.Context) {
+ t.Helper()
+ l := logger.GetLogger("test-idle-reclaim")
+ ctx := context.WithValue(context.Background(), logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{Database: "test-db", Stage: "test-stage"}
+ })
+ opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+ TSTableCreator: func(_ fs.FileSystem, _ string, _
common.Position, _ *logger.Logger,
+ _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+ ) (mockTSTable, error) {
+ return mockTSTable{ID: common.ShardID(0)}, nil
+ },
+ ShardNum: 2,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 7},
+ SeriesIndexFlushTimeoutSeconds: 10,
+ SeriesIndexCacheMaxBytes: 1024 * 1024,
+ }
+ serviceCache := NewServiceCache().(*serviceCache)
+ sc := newSegmentController[mockTSTable, mockTSTableOpener](
+ ctx,
+ tempDir,
+ l,
+ opts,
+ nil, // indexMetrics
+ nil, // metrics
+ idleTimeout,
+
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"),
opts.MemoryLimit),
+ serviceCache,
+ group,
+ )
+ return sc, ctx
+}
+
+// openReclaimTestSegment opens a single day segment and registers it with the
+// controller's segment list, matching the helper inlined in segment_test.go.
+func openReclaimTestSegment(
+ ctx context.Context, t *testing.T, sc *segmentController[mockTSTable,
mockTSTableOpener], tempDir string, start time.Time,
+) *segment[mockTSTable, mockTSTableOpener] {
+ t.Helper()
+ segmentPath := filepath.Join(tempDir,
"segment-"+start.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segmentPath, DirPerm))
+ require.NoError(t, os.WriteFile(filepath.Join(segmentPath,
metadataFilename), []byte(currentVersion), FilePerm))
+ seg, err := sc.openSegment(ctx, start, start.Add(24*time.Hour),
segmentPath, start.Format(dayFormat), sc.groupCache)
+ require.NoError(t, err)
+ sc.Lock()
+ sc.lst = append(sc.lst, seg)
+ sc.sortLst()
+ sc.Unlock()
+ return seg
+}
+
+func reclaimTestDay(t *testing.T) time.Time {
+ t.Helper()
+ now := time.Now().UTC()
+ return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0,
time.UTC)
+}
+
+// TestCloseIdleSegments_KeepsActivelyHeldReopenedSegment reproduces the cold
+// node snapshot panic. A snapshot reopens an idle-closed cold segment through
+// segments(ctx,true) (the call in the snapshot path) and holds the reference
+// for the duration of the snapshot. Because that reopen yields refCount==1 --
+// which the old reclaimer could not distinguish from an idle segment -- and
+// segments(ctx,true) does not refresh lastAccessed, the idle reclaimer
reclaims
+// the segment mid-snapshot and sets index=nil. The held caller then
dereferences
+// seg.index.store and panics.
+//
+// Invariant under test (model-agnostic): while a live reference is held, the
+// segment's index must stay open. FAILS on the unfixed tree.
+func TestCloseIdleSegments_KeepsActivelyHeldReopenedSegment(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+
+ // 1) Drive the segment into the idle-closed state every cold segment
lives
+ // in: stale lastAccessed -> reclaimed -> index writer released.
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index, "precondition: the cold segment must start
idle-closed")
+
+ // 2) The snapshot path reopens every segment via segments(ctx,true) and
+ // holds the references until the snapshot finishes (see
+ // (*database).TakeFileSnapshot).
+ held, err := sc.segments(ctx, true)
+ require.NoError(t, err)
+ require.Len(t, held, 1)
+ defer func() {
+ for _, s := range held {
+ s.DecRef()
+ }
+ }()
+ require.NotNil(t, seg.index, "the snapshot path must reopen the cold
segment")
+
+ // 3) The 10-minute idle reclaimer tick fires while the snapshot is
still
+ // holding the segment. (lastAccessed is still stale because
+ // segments(ctx,true)/incRef intentionally does not refresh it.)
+ sc.closeIdleSegments()
+
+ // 4) The held segment MUST still be open. On the unfixed tree it has
been
+ // reclaimed (index==nil); the next seg.index.store access -- e.g. the
+ // hard-link in the snapshot path -- then panics with a nil pointer
dereference.
+ require.NotNil(t, seg.index,
+ "the idle reclaimer must not close a segment that the snapshot
path is actively holding")
+}
+
+// TestDelete_KeepsIndexWhileActivelyHeld reproduces the second face of the
same
+// defect: before the fix, retention's delete() tore the index down (and
removed
+// the segment directory) even while a caller held a live reference, because
the
+// teardown bypassed the refCount>0 guard once mustBeDeleted was set. The
holder
+// was left with a use-after-free. Under the new model delete() defers to the
+// last DecRef while the segment is held.
+//
+// This is an isolated, deterministic reproduction of the segment-level defect;
+// in production it is hit as a race between retention and an in-flight reopen
+// on a cold node. FAILS on the unfixed tree.
+func TestDelete_KeepsIndexWhileActivelyHeld(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+
+ // Reduce to the cold-segment shape where a reopen yields the lone
reference
+ // (refCount==1): idle-close, then reopen as an active holder.
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index)
+
+ require.NoError(t, seg.incRef(ctx)) // active holder, e.g. an in-flight
query/snapshot
+ require.NotNil(t, seg.index)
+
+ // Retention marks the segment for deletion while the holder is still
using
+ // it.
+ seg.delete()
+
+ require.NotNil(t, seg.index,
+ "delete() must not close the index while a caller still holds a
reference")
+ require.DirExists(t, seg.location,
+ "delete() must not remove the segment directory while a caller
still holds a reference")
+
+ // Once the holder releases its reference, deletion proceeds.
+ seg.DecRef()
+ require.Nil(t, seg.index, "the index must be released once the last
reference is dropped")
+ require.NoDirExists(t, seg.location, "the directory must be removed
once the last reference is dropped")
+}
+
+// TestCloseIdleSegments_ReclaimsTrulyIdleSegment guards the PR #1128 goal: a
+// segment that is open but has no active reference and has gone idle must be
+// reclaimed so its bluge index writer is released. Passes on the unfixed tree
+// and must keep passing after the fix.
+func TestCloseIdleSegments_ReclaimsTrulyIdleSegment(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ require.NotNil(t, seg.index)
+
+ // No extra reference is held; only the segment's own open state keeps
it
+ // alive. Make it idle.
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+
+ require.Equal(t, 1, sc.closeIdleSegments(), "an idle, unreferenced
segment must be reclaimed")
+ require.Nil(t, seg.index, "a reclaimed segment must release its index
writer (PR #1128)")
+}
+
+// TestIncRef_DoesNotRefreshLastAccessed pins the invariant PR #1128
established:
+// incRef -- used by the rotation housekeeping scan that touches every segment
+// via segments(ctx,true) on each tick -- must NOT refresh lastAccessed. If it
+// did, no segment outside the active write window would ever look idle and the
+// reclaimer could never release their bluge writers, reintroducing the exact
+// leak #1128 fixed.
+//
+// This guards against the tempting-but-wrong fix of re-adding
+// lastAccessed.Store(now) inside incRef to paper over the reclaim race.
+func TestIncRef_DoesNotRefreshLastAccessed(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+
+ stale := time.Now().Add(-time.Hour).UnixNano()
+ seg.lastAccessed.Store(stale)
+
+ require.NoError(t, seg.incRef(ctx))
+ defer seg.DecRef()
+
+ require.Equal(t, stale, seg.lastAccessed.Load(),
+ "incRef must not refresh lastAccessed (would defeat the idle
reclaimer and reintroduce PR #1128's writer leak)")
+}
+
+// A freshly opened segment starts dormant -- open (index!=nil) with no
+// active reference (refCount==0).
+func TestOpenSegment_StartsDormant(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ require.NotNil(t, seg.index, "a freshly opened segment is open")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "a freshly
opened segment has no active reference")
+}
+
+// A dormant but freshly-accessed segment (refCount==0, lastAccessed recent)
+// must NOT be reclaimed by the idle reclaimer.
+func TestCloseIdleSegments_KeepsFreshDormantSegment(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+ require.Equal(t, 0, sc.closeIdleSegments(), "a fresh dormant segment
must not be reclaimed")
+ require.NotNil(t, seg.index, "a fresh dormant segment must stay open")
+}
+
+// DecRef to zero leaves the segment dormant (open) and reusable; a
+// subsequent incRef reuses it without reopening.
+func TestDecRef_ToZeroLeavesDormantAndReusable(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ require.NoError(t, seg.incRef(ctx))
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+
+ seg.DecRef()
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+ require.NotNil(t, seg.index, "DecRef to zero keeps the segment open
(dormant)")
+
+ require.NoError(t, seg.incRef(ctx))
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "incRef
reuses the dormant segment")
+ require.NotNil(t, seg.index)
+ seg.DecRef()
+}
+
+// A reclaimed (idle-closed) segment reopens on the next incRef.
+func TestReclaimedSegment_ReopensOnIncRef(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index)
+
+ require.NoError(t, seg.incRef(ctx))
+ require.NotNil(t, seg.index, "a reclaimed segment reopens on incRef")
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+ seg.DecRef()
+}
+
+// Deleting a dormant segment (no active reference) closes it and removes its
+// directory immediately.
+func TestDelete_DormantSegmentDeletedImmediately(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+ seg.delete()
+ require.Nil(t, seg.index, "deleting a dormant segment closes it
immediately")
+ require.NoDirExists(t, seg.location, "deleting a dormant segment
removes its directory immediately")
+}
+
+// deleteExpiredSegments defers the actual delete of a segment that a caller
+// is actively holding until the last reference is dropped.
+func TestDeleteExpiredSegments_DefersWhileHeld(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ require.NoError(t, seg.incRef(ctx)) // an in-flight query/snapshot
holds it
+
+ require.Equal(t, int64(1),
sc.deleteExpiredSegments([]string{seg.suffix}))
+ require.NotNil(t, seg.index, "deletion must be deferred while a
reference is held")
+ require.DirExists(t, seg.location, "the directory must survive while a
reference is held")
+
+ seg.DecRef()
+ require.Nil(t, seg.index, "the index is released once the last
reference is dropped")
+ require.NoDirExists(t, seg.location, "the directory is removed once the
last reference is dropped")
+}
+
+// Many open/reclaim cycles must not leak the bluge writer lock -- every
+// reopen (which acquires the exclusive directory lock) must keep succeeding.
+func TestReopenCycles_NoWriterLeak(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, 100*time.Millisecond)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ for i := 0; i < 8; i++ {
+ require.NoError(t, seg.incRef(ctx), "round %d: reopen must
succeed (writer lock not leaked)", i)
+ require.NotNil(t, seg.index)
+ seg.DecRef()
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.True(t, seg.closeIfIdle(time.Now().UnixNano()), "round
%d: reclaim", i)
+ require.Nil(t, seg.index)
+ }
+}
+
+// Under concurrent incRef/DecRef racing a tight closeIdleSegments
+// loop, an acquired reference must ALWAYS observe an open index (the reclaimer
+// never steals an active reference / leaves refCount>=1 with index==nil), no
+// goroutine panics, and the terminal refCount is conserved at 0. Run with
-race.
+func TestConcurrent_IncRefVsCloseIdle_NoStolenRefOrPanic(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ // Nanosecond idle timeout: the reclaimer considers the segment idle on
every
+ // tick, maximizing the race against acquirers.
+ sc, ctx := newReclaimTestController(t, tempDir, time.Nanosecond)
+ defer sc.close()
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+
+ // A fixed per-acquirer cycle count (rather than a wall-clock duration)
keeps
+ // the race deterministic across machines: every run exercises the same
number
+ // of incRef/DecRef vs closeIdleSegments interleavings, so a slow or
single-core
+ // CPU only makes the test slower -- it never makes it pass vacuously
or fail.
+ const (
+ acquirers = 8
+ cyclesPerAcq = 2000
+ )
+ var (
+ acquirersWG sync.WaitGroup
+ reclaimerWG sync.WaitGroup
+ reclaimDone = make(chan struct{})
+ violations int64
+ panics int64
+ )
+ reclaimerWG.Add(1)
+ go func() {
+ defer reclaimerWG.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ atomic.AddInt64(&panics, 1)
+ }
+ }()
+ for {
+ select {
+ case <-reclaimDone:
+ return
+ default:
+ }
+ sc.closeIdleSegments()
+ }
+ }()
+ for i := 0; i < acquirers; i++ {
+ acquirersWG.Add(1)
+ go func() {
+ defer acquirersWG.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ atomic.AddInt64(&panics, 1)
+ }
+ }()
+ for c := 0; c < cyclesPerAcq; c++ {
+ if err := seg.incRef(ctx); err != nil {
+ atomic.AddInt64(&violations, 1)
+ return
+ }
+ // Invariant: a held reference implies an open
index.
+ seg.mu.RLock()
+ if seg.index == nil {
+ atomic.AddInt64(&violations, 1)
+ }
+ seg.mu.RUnlock()
+ seg.DecRef()
+ }
+ }()
+ }
+ acquirersWG.Wait()
+ close(reclaimDone)
+ reclaimerWG.Wait()
+
+ require.Equal(t, int64(0), atomic.LoadInt64(&panics), "no goroutine may
panic")
+ require.Equal(t, int64(0), atomic.LoadInt64(&violations),
+ "a held reference must always see an open index (no stolen ref
/ nil index)")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "terminal
refCount must be conserved at 0")
+}
+
+// The idle reclaimer must refuse to close a segment flagged for deletion,
+// so it never races performDelete's directory removal. closeIfIdle leaves
such a
+// segment for delete()/performDelete to tear down.
+func TestCloseIfIdle_RefusesWhenMustBeDeleted(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ atomic.StoreUint32(&seg.mustBeDeleted, 1)
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano()) // also
idle
+
+ require.False(t, seg.closeIfIdle(time.Now().UnixNano()),
+ "closeIfIdle must refuse a segment flagged for deletion")
+ require.NotNil(t, seg.index, "a flagged segment is left for
performDelete, not idle-closed")
+}
+
+// Once a dormant segment is deleted (its directory removed by
+// performDelete), incRef must NOT reopen it -- the acquire slow path returns
+// ErrSegmentClosed rather than opening resources on the now-missing directory.
+func TestIncRef_AfterDelete_ReturnsErrSegmentClosed(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ seg.delete()
+ require.Nil(t, seg.index)
+ require.NoDirExists(t, seg.location)
+
+ require.ErrorIs(t, seg.incRef(ctx), ErrSegmentClosed,
+ "incRef must refuse to reopen a deleted segment")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "a refused
acquire adds no reference")
+}
+
+// A DecRef on a segment that is already closed (refCount==0, index==nil),
+// e.g. a stale release from a caller that obtained the segment before the idle
+// reclaimer closed it, is a harmless no-op and never drives refCount negative.
+func TestDecRef_OnClosedSegment_IsNoOp(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ seg := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.True(t, seg.closeIfIdle(time.Now().UnixNano()))
+ require.Nil(t, seg.index)
+
+ seg.DecRef()
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "DecRef on
a closed segment must not underflow")
+ require.Nil(t, seg.index, "DecRef must not resurrect a closed segment")
+}
+
+// peekOldestSegmentEndTime reports the oldest segment's end time only while
+// that segment is open (index loaded), preserving the pre-dormant-model
behavior
+// that gated on refCount>0. An idle-closed oldest segment is reported as
absent.
+func TestPeekOldestSegmentEndTime_OnlyWhileOpen(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ // Keep-one rule: with no segment, or a single segment, peek reports
absent.
+ _, ok := sc.peekOldestSegmentEndTime()
+ require.False(t, ok, "no segments -> not reported")
+
+ day1 := reclaimTestDay(t)
+ oldest := openReclaimTestSegment(ctx, t, sc, tempDir, day1)
+ _, ok = sc.peekOldestSegmentEndTime()
+ require.False(t, ok, "a single segment is not reported (keep-one rule)")
+
+ openReclaimTestSegment(ctx, t, sc, tempDir, day1.Add(24*time.Hour)) //
a newer segment; peek needs len>1
+
+ // Open (dormant) oldest: its end time is reported.
+ end, ok := sc.peekOldestSegmentEndTime()
+ require.True(t, ok, "an open oldest segment must be reported")
+ require.Equal(t, oldest.End, end)
+
+ // Idle-close the oldest: it must now be reported as absent.
+ oldest.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.True(t, oldest.closeIfIdle(time.Now().UnixNano()))
+ require.Nil(t, oldest.index)
+ _, ok = sc.peekOldestSegmentEndTime()
+ require.False(t, ok, "a closed oldest segment must not be reported")
+
+ // Reopen on access: it is reported again.
+ require.NoError(t, oldest.incRef(ctx))
+ end, ok = sc.peekOldestSegmentEndTime()
+ require.True(t, ok, "a reopened oldest segment is reported again")
+ require.Equal(t, oldest.End, end)
+ oldest.DecRef()
+}
+
+// Startup: the controller's disk-scan startup path (open()) loads every
+// on-disk segment in the DORMANT state -- open (index!=nil) but with
refCount==0
+// -- which the pre-refactor model could not express (it loaded segments with
+// refCount==1). A loaded segment must be immediately usable (a real read pins
+// and releases it) and reclaimable once idle.
+func TestControllerOpen_LoadsSegmentsDormant(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, _ := newReclaimTestController(t, tempDir, time.Hour)
+
+ day1 := reclaimTestDay(t)
+ day2 := day1.Add(24 * time.Hour)
+ for _, d := range []time.Time{day1, day2} {
+ segPath := filepath.Join(tempDir, "seg-"+d.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segPath, DirPerm))
+ require.NoError(t, os.WriteFile(filepath.Join(segPath,
metadataFilename), []byte(currentVersion), FilePerm))
+ }
+
+ require.NoError(t, sc.open())
+ require.Len(t, sc.lst, 2)
+
+ // Startup invariant: every loaded segment is open yet dormant.
+ for _, s := range sc.lst {
+ require.NotNil(t, s.index, "a freshly loaded segment must be
open")
+ require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount), "a
freshly loaded segment must be dormant")
+ }
+
+ // Usable: a real read reopens/pins each segment, and the matching
DecRef
+ // returns it to dormant (still open, refCount back to 0).
+ tr := timestamp.NewInclusiveTimeRange(day1, day2.Add(24*time.Hour))
+ got, err := sc.selectSegments(tr, true)
+ require.NoError(t, err)
+ require.Len(t, got, 2)
+ for _, s := range got {
+ s.DecRef()
+ }
+ for _, s := range sc.lst {
+ require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount), "back
to dormant after a read/release cycle")
+ require.NotNil(t, s.index, "still open after a read/release
cycle")
+ }
+
+ // Reclaimable: once idle, the reclaimer closes every loaded segment.
+ for _, s := range sc.lst {
+ s.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ }
+ require.Equal(t, 2, sc.closeIdleSegments())
+ for _, s := range sc.lst {
+ require.Nil(t, s.index, "a loaded segment is reclaimable to
closed")
+ }
+}
+
+// Shutdown: the controller's close() releases the resources of EVERY
+// segment regardless of refCount (DecRef no longer closes on reaching zero),
so
+// it must force-close even an actively-held segment. A segment flagged for
+// deletion additionally has its directory removed; the rest keep their data.
+func TestControllerClose_ReleasesAllAndDeletesFlagged(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ day1 := reclaimTestDay(t)
+ dormant := openReclaimTestSegment(ctx, t, sc, tempDir, day1)
+ held := openReclaimTestSegment(ctx, t, sc, tempDir,
day1.Add(24*time.Hour))
+ flagged := openReclaimTestSegment(ctx, t, sc, tempDir,
day1.Add(48*time.Hour))
+
+ require.NoError(t, held.incRef(ctx)) // an active reference survives
into shutdown
+ require.Equal(t, int32(1), atomic.LoadInt32(&held.refCount))
+ atomic.StoreUint32(&flagged.mustBeDeleted, 1)
+ dormantLoc, heldLoc, flaggedLoc := dormant.location, held.location,
flagged.location
+
+ sc.close()
+
+ require.Empty(t, sc.lst, "controller segment list is cleared on
shutdown")
+ require.Nil(t, dormant.index, "dormant segment resources are released
on shutdown")
+ require.Nil(t, held.index, "an actively-held segment is force-closed on
shutdown")
+ require.Nil(t, flagged.index, "flagged segment resources are released
on shutdown")
+
+ // Only the flagged segment's directory is removed.
+ require.DirExists(t, dormantLoc, "a normal segment keeps its data on
disk")
+ require.DirExists(t, heldLoc, "a held segment keeps its data on disk")
+ require.NoDirExists(t, flaggedLoc, "a segment flagged for deletion is
removed on shutdown")
+}
+
+// removeOldest (disk-pressure retention) deletes the oldest dormant segment
+// outright -- closing it and removing its directory immediately via the new
+// delete() path -- while keeping the rest.
+func TestRemoveOldest_DeletesDormantOldestKeepsRest(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ day1 := reclaimTestDay(t)
+ oldest := openReclaimTestSegment(ctx, t, sc, tempDir, day1)
+ newer := openReclaimTestSegment(ctx, t, sc, tempDir,
day1.Add(24*time.Hour))
+ oldestLoc := oldest.location
+
+ removed, err := sc.removeOldest()
+ require.NoError(t, err)
+ require.True(t, removed)
+ require.Len(t, sc.lst, 1)
+ require.Equal(t, newer.id, sc.lst[0].id, "the newer segment is kept")
+ require.Nil(t, oldest.index, "the removed oldest is closed")
+ require.NoDirExists(t, oldestLoc, "a dormant oldest segment's directory
is removed immediately")
+}
+
+// removeOldest honors the keep-one rule -- it never deletes the last
+// remaining segment.
+func TestRemoveOldest_KeepOneRule(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ only := openReclaimTestSegment(ctx, t, sc, tempDir, reclaimTestDay(t))
+ removed, err := sc.removeOldest()
+ require.NoError(t, err)
+ require.False(t, removed, "the last remaining segment must never be
removed")
+ require.Len(t, sc.lst, 1)
+ require.NotNil(t, only.index, "the kept segment stays open")
+}
+
+// removeOldest defers the actual teardown of the oldest segment while a
+// caller still holds a reference: it leaves the list immediately, but its
index
+// and directory survive until the last DecRef.
+func TestRemoveOldest_DefersWhileOldestHeld(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour)
+
+ day1 := reclaimTestDay(t)
+ oldest := openReclaimTestSegment(ctx, t, sc, tempDir, day1)
+ openReclaimTestSegment(ctx, t, sc, tempDir, day1.Add(24*time.Hour))
+ require.NoError(t, oldest.incRef(ctx)) // an in-flight reader holds the
oldest
+ oldestLoc := oldest.location
+
+ removed, err := sc.removeOldest()
+ require.NoError(t, err)
+ require.True(t, removed)
+ require.Len(t, sc.lst, 1, "the oldest is dropped from the list
immediately")
+ require.NotNil(t, oldest.index, "deletion is deferred while a reference
is held")
+ require.DirExists(t, oldestLoc, "the directory survives while held")
+
+ oldest.DecRef()
+ require.Nil(t, oldest.index)
+ require.NoDirExists(t, oldestLoc, "the directory is removed once the
last reference drops")
+}
+
+// getExpiredSegmentsTimeRange spans exactly the segments older than the TTL
+// deadline, and -- going through segments(false) -- must not leave a leaked
+// reference on any scanned dormant segment.
+func TestGetExpiredSegmentsTimeRange(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+ sc, ctx := newReclaimTestController(t, tempDir, time.Hour) // opts.TTL
= 7 days
+
+ today := reclaimTestDay(t)
+ expired := openReclaimTestSegment(ctx, t, sc, tempDir,
today.Add(-10*24*time.Hour))
+ openReclaimTestSegment(ctx, t, sc, tempDir, today) // fresh, not expired
+
+ tr := sc.getExpiredSegmentsTimeRange()
+ require.Equal(t, expired.Start, tr.Start, "range starts at the expired
segment")
+ require.Equal(t, expired.End, tr.End, "range ends at the expired
segment")
+
+ for _, s := range sc.lst {
+ require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount),
+ "scanning the expired range via segments(false) must
not leak references")
+ require.NotNil(t, s.index, "scanning must not close any
segment")
+ }
+}
diff --git a/banyand/internal/storage/segment_lifecycle_integration_test.go
b/banyand/internal/storage/segment_lifecycle_integration_test.go
new file mode 100644
index 000000000..0e21a7941
--- /dev/null
+++ b/banyand/internal/storage/segment_lifecycle_integration_test.go
@@ -0,0 +1,218 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "path/filepath"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+)
+
+// TestSegmentLifecycle_EndToEnd is a full-flow integration test that drives
the
+// entire segment lifecycle through the public TSDB API against a real series
+// index (which persists to disk), asserting the dormant-refcount invariants
and
+// on-disk durability at every transition:
+//
+// create+write -> active(refCount==1) -> DecRef(dormant, still open) ->
+// query reopen/pin/release -> snapshot via live state ->
+// idle reclaim (flush to disk, release writer lock) ->
+// closed-segment stats + snapshot (read-only / hard-link, NO reopen) ->
+// reopen on read -> restart (Close + reopen, data survives on disk) ->
+// retention delete (directory removed).
+func TestSegmentLifecycle_EndToEnd(t *testing.T) {
+ dir := snapshotTestDir(t)
+ const seriesCount = 12
+ tr := allTimeRange()
+
+ tsdb, sc := openSnapshotTSDB(t, dir, 3)
+
+ // 1. Create + write: a fresh segment is open and actively referenced.
+ s, err := tsdb.CreateSegmentIfNotExist(snapshotTestBase)
+ require.NoError(t, err)
+ seg := s.(*segment[*MockTSTable, any])
+ require.NotNil(t, seg.index, "a freshly created segment is open")
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "create
yields exactly one active reference")
+ require.NoError(t, s.IndexDB().Insert(buildTestSeriesDocs(t,
seriesCount)))
+
+ // 2. Release to dormant: DecRef to zero keeps the segment OPEN (the
+ // dormant-refcount invariant -- it does not close), and the data stays
readable.
+ s.DecRef()
+ require.NotNil(t, seg.index, "DecRef to zero leaves the segment open
(dormant), not closed")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+ count, size := seg.SeriesIndexStats()
+ require.Equal(t, int64(seriesCount), count, "data is readable while
dormant")
+ require.Positive(t, size)
+
+ // 3. Query path: a real read reopens-if-needed, pins the segment, and
the
+ // matching DecRef returns it to dormant (still open).
+ got, err := tsdb.SelectSegments(tr, true)
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "a query
pins the segment")
+ for _, g := range got {
+ g.DecRef()
+ }
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "query
released back to dormant")
+ require.NotNil(t, seg.index)
+
+ // 4. Snapshot while open (dormant): goes through the live-state path,
must
+ // not close the segment, and captures every document.
+ openSnap := filepath.Join(t.TempDir(), "snap-open")
+ created, err := tsdb.TakeFileSnapshot(openSnap)
+ require.NoError(t, err)
+ require.True(t, created)
+ require.NotNil(t, seg.index, "snapshot must not close an open segment")
+ require.Equal(t, int64(seriesCount), snapshotSeriesDocCount(t,
openSnap, seg), "open-path snapshot holds all docs")
+
+ // 5. Idle reclaim: closes the segment, flushing the index to disk and
+ // releasing the bluge exclusive writer lock.
+ idleClose(t, sc, seg)
+ require.NoFileExists(t, filepath.Join(seg.location, seriesIndexDirName,
inverted.LockFilename),
+ "idle-close releases the bluge writer lock (bluge.pid)")
+
+ // 6. Closed segment: stats are read from disk read-only and must NOT
reopen
+ // the writable index.
+ count, size = seg.SeriesIndexStats()
+ require.Equal(t, int64(seriesCount), count, "closed-segment data is
read from disk")
+ require.Positive(t, size)
+ require.Nil(t, seg.index, "reading stats must not reopen a closed
segment")
+
+ // 7. Snapshot while closed: hard-link path, identical content, still
no reopen.
+ closedSnap := filepath.Join(t.TempDir(), "snap-closed")
+ created, err = tsdb.TakeFileSnapshot(closedSnap)
+ require.NoError(t, err)
+ require.True(t, created)
+ require.Nil(t, seg.index, "snapshot must not reopen a closed segment")
+ require.Equal(t, int64(seriesCount), snapshotSeriesDocCount(t,
closedSnap, seg),
+ "closed-path snapshot captures the same docs as the open-path
snapshot")
+
+ // 8. Reopen on access: a real read reopens the closed segment.
+ got, err = tsdb.SelectSegments(tr, true)
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ require.NotNil(t, seg.index, "a read reopens a closed segment")
+ for _, g := range got {
+ g.DecRef()
+ }
+
+ // 9. Restart: closing the DB flushes everything; reopening reloads the
+ // segment dormant and the series data survives on disk.
+ require.NoError(t, tsdb.Close())
+ tsdb2, sc2 := openSnapshotTSDB(t, dir, 3)
+ require.Len(t, sc2.lst, 1, "the segment is reloaded from disk on
restart")
+ reloaded := sc2.lst[0]
+ require.NotNil(t, reloaded.index, "a reloaded segment is open")
+ require.Equal(t, int32(0), atomic.LoadInt32(&reloaded.refCount), "a
reloaded segment starts dormant")
+ count, _ = reloaded.SeriesIndexStats()
+ require.Equal(t, int64(seriesCount), count, "series data survives a
restart on disk")
+
+ // 10. Retention: removeOldest honors keep-one, so add a newer segment,
then
+ // delete the oldest -- its directory is removed from disk.
+ _ = createSegmentWithSeries(t, tsdb2, snapshotDay(1), 3)
+ oldestLoc := reloaded.location
+ removed, err := sc2.removeOldest()
+ require.NoError(t, err)
+ require.True(t, removed)
+ require.NoDirExists(t, oldestLoc, "retention removes the oldest
segment's directory")
+ require.Len(t, sc2.lst, 1, "the newer segment remains after retention")
+
+ require.NoError(t, tsdb2.Close())
+}
+
+// TestSegment_PostDeleteOperations_NoPanic pins the safety of operations that
+// can still run against a segment AFTER it has been deleted (its directory
+// removed): inspect (SeriesIndexStats + CollectClosedShardInfo) and backup
+// (snapshotInto). Under the dormant-refcount model a dormant segment is NOT
+// pinned by the stats path, so these can race a concurrent delete that tears
+// down resources and removes the directory. All such operations must degrade
+// gracefully (no panic, best-effort zero results), never read a removed dir
+// fatally. Run with -race.
+func TestSegment_PostDeleteOperations_NoPanic(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, _, seg := openSnapshotTestTSDB(t, dir)
+ defer func() { _ = tsdb.Close() }()
+
+ // Real on-disk data so inspect/snapshot do meaningful work before
deletion.
+ require.NoError(t, seg.IndexDB().Insert(buildTestSeriesDocs(t, 8)))
+
+ snapBase := t.TempDir()
+ const (
+ readers = 4
+ snapshotters = 2
+ cycles = 200
+ )
+ var (
+ wg sync.WaitGroup
+ ready sync.WaitGroup
+ panics int64
+ snapSeq int64
+ )
+ ready.Add(readers + snapshotters)
+ // Each worker runs a fixed number of cycles (no wall-clock timing) and
signals
+ // `ready` after its first iteration. The delete fires only once every
worker
+ // has run at least once, so it is guaranteed to race operations that
are still
+ // looping -- deterministically, on any CPU, with no sleep a slow
machine could
+ // under- or over-shoot.
+ guard := func(fn func()) {
+ defer wg.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ atomic.AddInt64(&panics, 1)
+ }
+ }()
+ for c := 0; c < cycles; c++ {
+ fn()
+ if c == 0 {
+ ready.Done()
+ }
+ }
+ }
+
+ // Inspect readers.
+ for i := 0; i < readers; i++ {
+ wg.Add(1)
+ go guard(func() {
+ _, _ = seg.SeriesIndexStats()
+ _, _ = CollectClosedShardInfo(seg.Location())
+ })
+ }
+ // Backup writers, each to its own destination.
+ for i := 0; i < snapshotters; i++ {
+ wg.Add(1)
+ go guard(func() {
+ dst := filepath.Join(snapBase,
"snap-"+strconv.FormatInt(atomic.AddInt64(&snapSeq, 1), 10))
+ _, _ = seg.snapshotInto(dst)
+ })
+ }
+
+ ready.Wait()
+ seg.delete() // races the operations that are still looping
+ wg.Wait()
+
+ require.Equal(t, int64(0), atomic.LoadInt64(&panics),
+ "inspect/snapshot operations on a deleted segment must not
panic")
+ require.NoDirExists(t, seg.location,
+ "once no operation holds a reference, the deleted segment's
directory is gone")
+ require.Nil(t, seg.index, "the deleted segment is closed")
+}
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index 20175520b..640b34eb9 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -140,21 +140,26 @@ func TestSegmentOpenAndReopen(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, segment)
- // Verify segment is open
- assert.Greater(t, segment.refCount, int32(0))
+ // A freshly opened segment is dormant: resources open (index!=nil)
with no
+ // active reference (refCount==0).
+ assert.NotNil(t, segment.index)
+ assert.Equal(t, int32(0), segment.refCount)
- // Close segment by decrementing reference count
- initialRefCount := segment.refCount
+ // Acquire an active reference, then release it: DecRef to zero leaves
the
+ // segment dormant (open), it does NOT close.
+ require.NoError(t, segment.incRef(ctx))
+ assert.Equal(t, int32(1), segment.refCount)
segment.DecRef()
+ assert.Equal(t, int32(0), segment.refCount)
+ assert.NotNil(t, segment.index)
- // Verify segment is closed (refCount reduced)
- assert.Equal(t, initialRefCount-1, segment.refCount)
-
- // Reopen segment
- segment.incRef(ctx)
-
- // Verify segment is properly reopened
- assert.Equal(t, initialRefCount, segment.refCount)
+ // Idle-close it (resources released), then incRef reopens it.
+ segment.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.True(t, segment.closeIfIdle(time.Now().UnixNano()))
+ assert.Nil(t, segment.index)
+ require.NoError(t, segment.incRef(ctx))
+ assert.Equal(t, int32(1), segment.refCount)
+ defer segment.DecRef()
// Verify we can still access segment data
assert.NotNil(t, segment.index)
@@ -234,17 +239,22 @@ func TestSegmentCloseIfIdle(t *testing.T) {
segment, err := sc.openSegment(ctx, startTime, endTime, segmentPath,
suffix, sc.groupCache)
require.NoError(t, err)
- // Force last access time to be in the past
+ // A freshly opened segment is dormant (open, refCount==0).
+ require.NotNil(t, segment.index)
+ require.Equal(t, int32(0), segment.refCount)
+
+ // Force last access time to be in the past so the idle reclaimer
closes it.
segment.lastAccessed.Store(time.Now().Add(-time.Minute).UnixNano())
- // Close if idle should succeed
- segment.DecRef()
+ // closeIfIdle releases the dormant, idle segment's resources.
+ require.True(t, segment.closeIfIdle(time.Now().UnixNano()))
// Verify segment is closed
assert.Nil(t, segment.index)
// Test reopening the segment
- segment.incRef(ctx)
+ require.NoError(t, segment.incRef(ctx))
+ defer segment.DecRef()
// Verify segment is properly reopened
assert.NotNil(t, segment.index)
@@ -339,10 +349,10 @@ func TestCloseIdleAndSelectSegments(t *testing.T) {
// Verify we have three segments
require.Len(t, sc.lst, 3)
- // Make sure all segments have reference counts > 0
- require.Greater(t, seg1.refCount, int32(0))
- require.Greater(t, seg2.refCount, int32(0))
- require.Greater(t, seg3.refCount, int32(0))
+ // All segments start dormant: open (index!=nil) with no active
reference.
+ require.NotNil(t, seg1.index)
+ require.NotNil(t, seg2.index)
+ require.NotNil(t, seg3.index)
// Force segments 1 and 3 to be idle (setting last accessed time in the
past)
seg1.lastAccessed.Store(time.Now().Add(-time.Second).UnixNano())
@@ -590,9 +600,9 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t
*testing.T) {
segments = append(segments, segment)
}
- // Verify all segments are initially open
+ // Verify all segments are initially open (dormant: index!=nil,
refCount==0)
for i, seg := range segments {
- assert.Greater(t, seg.refCount, int32(0), "Segment %d should be
open", i)
+ assert.NotNil(t, seg.index, "Segment %d should be open", i)
}
// Set the "lastAccessed" time for some segments to make them idle
@@ -614,12 +624,12 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t
*testing.T) {
closedCount := sc.closeIdleSegments()
assert.Equal(t, 3, closedCount, "Should have closed 3 segments")
- // Verify segments 0, 2, and 4 are closed
- assert.Equal(t, int32(0), segments[0].refCount, "Segment 0 should be
closed")
+ // Verify segments 0, 2, and 4 are closed (index released)
+ assert.Nil(t, segments[0].index, "Segment 0 should be closed")
assert.NotNil(t, segments[1].index, "Segment 1 should remain open")
- assert.Equal(t, int32(0), segments[2].refCount, "Segment 2 should be
closed")
+ assert.Nil(t, segments[2].index, "Segment 2 should be closed")
assert.NotNil(t, segments[3].index, "Segment 3 should remain open")
- assert.Equal(t, int32(0), segments[4].refCount, "Segment 4 should be
closed")
+ assert.Nil(t, segments[4].index, "Segment 4 should be closed")
assert.NotNil(t, segments[5].index, "Segment 5 should remain open")
// Now delete expired segments
@@ -917,10 +927,11 @@ func TestSegment_IndexDB_ReturnsUntypedNilWhenClosed(t
*testing.T) {
startTime.Format(dayFormat), sc.groupCache)
require.NoError(t, err)
- // Idle-close: drop the only reference so performCleanup runs and the
- // segment ends up in the residual state seen on cold-tier nodes
- // (refCount=0, s.index=nil, but segment still reachable for reopen).
- seg.DecRef()
+ // Idle-close: the reclaimer releases the dormant segment's resources
so it
+ // ends up in the residual state seen on cold-tier nodes (refCount=0,
+ // s.index=nil, but segment still reachable for reopen).
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.True(t, seg.closeIfIdle(time.Now().UnixNano()))
require.Nil(t, seg.index)
require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
@@ -949,18 +960,17 @@ func TestSegment_IndexDB_ReturnsUntypedNilWhenClosed(t
*testing.T) {
// TestSegment_ConcurrentReopen_RefCountConsistent stress-tests the
// reference-count contract of segment.incRef under concurrent reopen.
-// Before the fix, segment.initialize() returned without AddInt32 when
+// Before the fix, the reopen slow path returned without AddInt32 when
// another goroutine had already reopened the segment, so the second caller
// "owned" a reference it had never accounted for. The first DecRef would
-// then trigger performCleanup while another goroutine was still using the
-// segment.
+// then tear the segment down while another goroutine was still using it.
//
// After the fix, every concurrent incRef must add exactly one reference
// regardless of which path it takes.
//
// Reliability note: a single 64-goroutine round can occasionally serialize
// well enough that only the first caller enters the slow path
-// (initialize) and the rest take the fast path (CAS+1). That serialized
+// (acquire) and the rest take the fast path (CAS+1). That serialized
// shape would also pass on the buggy code. To make the test deterministic
// across machines, the experiment is repeated for `rounds` cycles, each
// starting from the residual state. The bug becomes effectively
@@ -1010,8 +1020,9 @@ func TestSegment_ConcurrentReopen_RefCountConsistent(t
*testing.T) {
startTime.Format(dayFormat), sc.groupCache)
require.NoError(t, err)
- // Bring the segment into the cold-tier residual state.
- seg.DecRef()
+ // Bring the segment into the cold-tier residual (closed) state.
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.True(t, seg.closeIfIdle(time.Now().UnixNano()))
require.Nil(t, seg.index)
require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
@@ -1048,16 +1059,21 @@ func TestSegment_ConcurrentReopen_RefCountConsistent(t
*testing.T) {
"round %d: every concurrent incRef must add exactly one
reference", r)
require.NotNil(t, seg.index, "round %d: segment.index must be
reopened", r)
- // Drain references; the last DecRef must trigger performCleanup
- // and bring the segment back to the residual state for the next
- // round.
+ // Drain references. DecRef to zero leaves the segment dormant
(open);
+ // the idle reclaimer then closes it, restoring the residual
state for
+ // the next round.
for i := 0; i < N; i++ {
seg.DecRef()
}
require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount),
"round %d", r)
+ require.NotNil(t, seg.index,
+ "round %d: draining to zero leaves the segment dormant
(open)", r)
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.True(t, seg.closeIfIdle(time.Now().UnixNano()),
+ "round %d: the reclaimer must close the dormant
segment", r)
require.Nil(t, seg.index,
- "round %d: segment.index must be cleared by
performCleanup", r)
+ "round %d: segment.index must be cleared by the
reclaimer", r)
}
}
@@ -1065,14 +1081,15 @@ func TestSegment_ConcurrentReopen_RefCountConsistent(t
*testing.T) {
// reopen pattern under heavy concurrency: many goroutines repeatedly call
// selectSegments, exercise IndexDB().Stats() on each returned segment, and
// release with DecRef. With idle-closed segments as the starting state,
-// each iteration triggers initialize() to reopen the segment and the final
-// DecRef triggers performCleanup() to close it again, so closes and
-// reopens are interleaved across goroutines.
+// each iteration triggers acquire()/initialize() to reopen the segment;
+// DecRef leaves it dormant (open, refCount==0) rather than closing it, so
+// reopens and dormant releases interleave across goroutines.
//
// Verifies (run with -race):
// - no goroutine panics (covers both the typed-nil and refcount fixes)
// - no data race is detected on the segment lifecycle
-// - all segments end up cleanly closed (refCount == 0, index == nil)
+// - no segment leaks an active reference (refCount == 0 at the end)
+// - every dormant segment is then reclaimable to closed via the idle
reclaimer
func TestSegment_ConcurrentReopenAndClose_NoPanic(t *testing.T) {
tempDir, cleanup := setupTestEnvironment(t)
defer cleanup()
@@ -1123,8 +1140,9 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t
*testing.T) {
sc.lst = append(sc.lst, seg)
sc.sortLst()
sc.Unlock()
- // Drop the open reference so each segment starts in the
residual state.
- seg.DecRef()
+ // Force each segment into the residual (closed) state to start.
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ require.True(t, seg.closeIfIdle(time.Now().UnixNano()))
require.Nil(t, seg.index)
}
@@ -1174,10 +1192,19 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t
*testing.T) {
sc.RLock()
final := append([]*segment[mockTSTable, mockTSTableOpener]{}, sc.lst...)
sc.RUnlock()
+ // No goroutine may leak an active reference.
for _, s := range final {
require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount),
"segment %s leaked references", s.suffix)
- require.Nil(t, s.index, "segment %s should be cleaned up",
s.suffix)
+ }
+ // After the churn the segments are dormant (open, refCount==0); the
idle
+ // reclaimer must be able to close every one of them cleanly.
+ for _, s := range final {
+ s.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ }
+ sc.closeIdleSegments()
+ for _, s := range final {
+ require.Nil(t, s.index, "segment %s should be reclaimable to
closed", s.suffix)
}
}
diff --git a/banyand/internal/storage/snapshot_closed_segment_test.go
b/banyand/internal/storage/snapshot_closed_segment_test.go
index 11b6be406..5c4719dad 100644
--- a/banyand/internal/storage/snapshot_closed_segment_test.go
+++ b/banyand/internal/storage/snapshot_closed_segment_test.go
@@ -88,6 +88,10 @@ func openSnapshotTSDB(t *testing.T, dir string, ttlDays int)
(TSDB[*MockTSTable,
TSTableCreator: MockTSTableCreator,
SegmentIdleTimeout: time.Hour,
}
+ // Pin the controller's clock to snapshotTestBase so retention's
deadline is
+ // computed from the same clock the segments are created against;
otherwise the
+ // fixed-2024 segments would look retention-expired against the real
wall clock
+ // and SelectSegments would filter them out.
mc := timestamp.NewMockClock()
mc.Set(snapshotTestBase)
ctx := timestamp.SetClock(context.Background(), mc)
@@ -103,7 +107,7 @@ func openSnapshotTestTSDB(t *testing.T, dir string)
(TSDB[*MockTSTable, any], *s
tsdb, sc := openSnapshotTSDB(t, dir, 3)
seg, err := tsdb.CreateSegmentIfNotExist(snapshotTestBase)
require.NoError(t, err)
- seg.DecRef() // leave it at its open baseline
+ seg.DecRef() // release the create ref, leaving it open but dormant
(refCount==0)
require.Len(t, sc.lst, 1)
return tsdb, sc, sc.lst[0]
}
@@ -195,7 +199,7 @@ func TestSeriesIndexStats_ClosedSegmentIsNotReopened(t
*testing.T) {
defer func() { require.NoError(t, tsdb.Close()) }()
// Write a known number of series into the open segment's index, then
drive
- // it idle-closed (performCleanup flushes the index to disk on close).
+ // it idle-closed (closeIfIdle flushes the index to disk on close).
const seriesCount = 10
require.NoError(t, seg.IndexDB().Insert(buildTestSeriesDocs(t,
seriesCount)))
@@ -308,7 +312,7 @@ func snapshotDay(i int) time.Time {
// createSegmentWithSeries creates (via the controller) the segment containing
// ts, inserts n series documents into its series index, and returns the
-// internal segment left at its open baseline (the create reference is
released).
+// internal segment left open but dormant (the create reference is released).
func createSegmentWithSeries(t *testing.T, tsdb TSDB[*MockTSTable, any], ts
time.Time, n int) *segment[*MockTSTable, any] {
t.Helper()
s, err := tsdb.CreateSegmentIfNotExist(ts)
@@ -529,8 +533,9 @@ func idleClose(t *testing.T, sc
*segmentController[*MockTSTable, any], seg *segm
}
// TestSelectSegments_ReopenTrue_ReopensClosedSegment: reopenClosed=true on a
-// CLOSED segment reopens it (incRef -> initialize), restores refCount to 1,
and
-// refreshes lastAccessed (a real read keeps the segment hot).
+// CLOSED segment reopens it (incRef -> acquire), takes refCount to 1, and
+// refreshes lastAccessed (a real read keeps the segment hot). DecRef to zero
+// then leaves it dormant (open), not closed.
func TestSelectSegments_ReopenTrue_ReopensClosedSegment(t *testing.T) {
dir := snapshotTestDir(t)
tsdb, sc := newEmptySnapshotTSDB(t, dir)
@@ -544,36 +549,38 @@ func TestSelectSegments_ReopenTrue_ReopensClosedSegment(t
*testing.T) {
require.NoError(t, err)
require.Len(t, got, 1)
require.NotNil(t, seg.index, "reopenClosed=true must reopen a closed
segment")
- require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "reopen
reinitializes refCount to 1")
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "acquire
takes refCount to 1")
require.Greater(t, seg.lastAccessed.Load(), closedAccessed, "a real
read refreshes lastAccessed")
got[0].DecRef()
- require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "DecRef
returns the reopened segment to closed")
- require.Nil(t, seg.index)
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+ require.NotNil(t, seg.index, "DecRef to zero leaves the reopened
segment dormant (open), not closed")
}
-// TestSelectSegments_ReopenTrue_OpenSegmentBumpsAndRefreshes:
reopenClosed=true
-// on an OPEN segment pins it (+1) and refreshes lastAccessed without
re-opening.
-func TestSelectSegments_ReopenTrue_OpenSegmentBumpsAndRefreshes(t *testing.T) {
+// TestSelectSegments_ReopenTrue_DormantSegmentAcquiredAndRefreshed:
+// reopenClosed=true on a DORMANT open segment (refCount==0, index!=nil)
acquires
+// it (0->1) and refreshes lastAccessed without re-opening; DecRef returns it
to
+// dormant.
+func TestSelectSegments_ReopenTrue_DormantSegmentAcquiredAndRefreshed(t
*testing.T) {
dir := snapshotTestDir(t)
tsdb, sc := newEmptySnapshotTSDB(t, dir)
defer func() { require.NoError(t, tsdb.Close()) }()
seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
require.NotNil(t, seg.index)
- require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "open
baseline refCount is 1")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "dormant
baseline: open, refCount 0")
seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
before := seg.lastAccessed.Load()
got, err := sc.selectSegments(allTimeRange(), true)
require.NoError(t, err)
require.Len(t, got, 1)
- require.NotNil(t, seg.index, "open segment stays open")
- require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "incRef
bumps the open segment")
+ require.NotNil(t, seg.index, "dormant open segment stays open (no
reopen needed)")
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "acquire
takes the dormant segment 0->1")
require.Greater(t, seg.lastAccessed.Load(), before, "a real read
refreshes lastAccessed")
got[0].DecRef()
- require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
}
// TestSelectSegments_ReopenFalse_ClosedSegmentNotReopened: reopenClosed=false
on
@@ -600,28 +607,31 @@ func
TestSelectSegments_ReopenFalse_ClosedSegmentNotReopened(t *testing.T) {
require.Nil(t, seg.index)
}
-// TestSelectSegments_ReopenFalse_OpenSegmentPinnedNoRefresh:
reopenClosed=false
-// on an OPEN segment pins it (+1) but does NOT refresh lastAccessed, so the
-// segment stays eligible for the next idle-close tick.
-func TestSelectSegments_ReopenFalse_OpenSegmentPinnedNoRefresh(t *testing.T) {
+// TestSelectSegments_ReopenFalse_DormantSegmentNotPinned: reopenClosed=false
on a
+// DORMANT open segment (refCount==0) returns it WITHOUT pinning. By design the
+// stats path never bumps a refCount==0 segment (that would race the idle
+// reclaimer); stats are best-effort. The segment stays open and its idle timer
+// is not refreshed.
+func TestSelectSegments_ReopenFalse_DormantSegmentNotPinned(t *testing.T) {
dir := snapshotTestDir(t)
tsdb, sc := newEmptySnapshotTSDB(t, dir)
defer func() { require.NoError(t, tsdb.Close()) }()
seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
- require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+ require.NotNil(t, seg.index)
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "dormant:
open, refCount 0")
seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
before := seg.lastAccessed.Load()
got, err := sc.selectSegments(allTimeRange(), false)
require.NoError(t, err)
require.Len(t, got, 1)
- require.NotNil(t, seg.index, "open segment stays open")
- require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "open
segment is pinned (+1)")
+ require.NotNil(t, seg.index, "dormant open segment stays open")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "a dormant
segment is not pinned by the stats path")
require.Equal(t, before, seg.lastAccessed.Load(), "a stats peek must
not refresh lastAccessed")
- got[0].DecRef()
- require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+ got[0].DecRef() // no-op
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
}
// TestSelectSegments_ReopenFalse_InUseSegmentPinned: reopenClosed=false on a
@@ -633,17 +643,17 @@ func TestSelectSegments_ReopenFalse_InUseSegmentPinned(t
*testing.T) {
defer func() { require.NoError(t, tsdb.Close()) }()
seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
- require.NoError(t, seg.incRef(context.Background())) // a concurrent
reader holds a ref
- require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "in-use:
baseline + one active reader")
+ require.NoError(t, seg.incRef(context.Background())) // an active
reader holds a ref
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "one active
reader")
got, err := sc.selectSegments(allTimeRange(), false)
require.NoError(t, err)
require.Len(t, got, 1)
- require.Equal(t, int32(3), atomic.LoadInt32(&seg.refCount), "in-use
open segment is pinned (+1)")
+ require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "an in-use
(refCount>0) segment is pinned (+1)")
got[0].DecRef() // release the selectSegments pin
- seg.DecRef() // release the simulated active reader
- require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "back to
open baseline")
+ seg.DecRef() // release the active reader
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "back to
dormant")
}
// TestSelectSegments_TimeRangeFilters: only segments overlapping the requested
@@ -699,3 +709,56 @@ func TestSelectSegments_PublicWrapper(t *testing.T) {
require.NoError(t, err)
require.Nil(t, got, "a closed database returns no segments")
}
+
+// TestSelectSegments_FiltersRetentionExpiredSegment verifies that
+// SelectSegments(_, true) excludes a segment whose whole time range has passed
+// the retention deadline, and that the deadline tracks the controller's
+// injectable clock: advancing the mock clock past the TTL expires the segment
+// even though it is still on disk (retention removes it only on its next run).
+func TestSelectSegments_FiltersRetentionExpiredSegment(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, sc := openSnapshotTSDB(t, dir, 3) // TTL = 3 days
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotTestBase, 0)
+ require.NotNil(t, seg.index)
+
+ // Clock at snapshotTestBase: the segment is within the TTL and is
returned.
+ got, err := tsdb.SelectSegments(allTimeRange(), true)
+ require.NoError(t, err)
+ require.Len(t, got, 1, "a segment within the TTL is returned")
+ for _, g := range got {
+ g.DecRef()
+ }
+
+ // Advance the controller clock past the TTL so the segment's whole
range is
+ // behind the retention deadline; SelectSegments must now filter it out.
+ sc.clock.(timestamp.MockClock).Add(10 * 24 * time.Hour)
+ got, err = tsdb.SelectSegments(allTimeRange(), true)
+ require.NoError(t, err)
+ require.Empty(t, got, "a fully retention-expired segment is excluded
from query results")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "the
filtered segment leaks no reference")
+}
+
+// TestGetExpiredSegmentsTimeRange_TracksInjectableClock verifies that the
+// expired-segment time range is computed against the controller's injectable
+// clock: a segment within the TTL reports nothing, and advancing the mock
clock
+// past the TTL makes it expired. (Guards against reverting to wall-clock
+// time.Now(), which would mis-expire mock-clock-dated segments.)
+func TestGetExpiredSegmentsTimeRange_TracksInjectableClock(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, sc := openSnapshotTSDB(t, dir, 3) // TTL = 3 days
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotTestBase, 0)
+
+ // Clock at snapshotTestBase: the segment is within the TTL, nothing
expired.
+ tr := sc.getExpiredSegmentsTimeRange()
+ require.True(t, tr.Start.IsZero(), "no segment is expired while the
clock is within the TTL")
+
+ // Advance the clock past the TTL: the segment is now expired and
reported.
+ sc.clock.(timestamp.MockClock).Add(10 * 24 * time.Hour)
+ tr = sc.getExpiredSegmentsTimeRange()
+ require.Equal(t, seg.Start, tr.Start, "the expired range starts at the
now-expired segment")
+ require.Equal(t, seg.End, tr.End, "the expired range ends at the
now-expired segment")
+}
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 9dff036f1..ae3275d04 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -405,21 +405,17 @@ func (d *database[T, O]) collect() {
return
}
d.metrics.lastTickTime.Set(float64(d.latestTickTime.Load()))
- refCount := int32(0)
- ss, _ := d.segmentController.segments(context.Background(), false)
- for _, s := range ss {
- if atomic.LoadInt32(&s.refCount) <= 0 {
- continue
- }
- tables, _ := s.Tables()
- for _, t := range tables {
- t.Collect(d.segmentController.metrics)
+ // Collect metrics for every open segment. Under the dormant-refcount
model
+ // an open segment usually has refCount==0, so we cannot rely on a
reference
+ // bump to pin it: collectOpenMetrics gathers under the segment's read
lock,
+ // which a concurrent reclaim (closeIfIdle takes the write lock) cannot
race.
+ openSegments := int32(0)
+ for _, s := range d.segmentController.copySegments() {
+ if s.collectOpenMetrics(d.segmentController.metrics) {
+ openSegments++
}
- s.collectMetrics()
- s.DecRef()
- refCount += atomic.LoadInt32(&s.refCount)
}
- d.totalSegRefs.Set(float64(refCount))
+ d.totalSegRefs.Set(float64(openSegments))
if d.metrics.schedulerMetrics == nil {
return
}
diff --git a/banyand/internal/storage/tsdb_test.go
b/banyand/internal/storage/tsdb_test.go
index e679a6a74..e2ac247f8 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -707,11 +707,11 @@ func TestCollectWithPartialClosedSegments(t *testing.T) {
ss, _ = sc.segments(context.Background(), false)
for _, s := range ss {
if s.Start.Equal(segmentDates[0]) ||
s.Start.Equal(segmentDates[2]) {
- require.Equal(t, int32(0), s.refCount, "Segment should
be closed")
+ require.Nil(t, s.index, "Segment should be closed")
} else {
- require.Greater(t, s.refCount, int32(0), "Segment
should be open")
+ require.NotNil(t, s.index, "Segment should be open")
}
- s.DecRef() // Release reference
+ s.DecRef() // no-op for dormant/closed segments
}
// Call the collect method with mixed open/closed segments
diff --git a/banyand/measure/write_data_segmentref_test.go
b/banyand/measure/write_data_segmentref_test.go
index b71f5e742..c1a36f7eb 100644
--- a/banyand/measure/write_data_segmentref_test.go
+++ b/banyand/measure/write_data_segmentref_test.go
@@ -59,8 +59,7 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
seg, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
- seg.DecRef()
- seg.DecRef() // refCount -> 0 (idle-closed)
+ seg.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
baselineOpens := openShardCount.Load()
segA, err := db.CreateSegmentIfNotExist(segTime)
@@ -106,8 +105,8 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
defer segC.DecRef()
_, err = segC.CreateTSTableIfNotExist(common.ShardID(0))
require.NoError(t, err)
- require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
- "REGRESSION: Close did not call segment.DecRef()")
+ require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+ "after Close the dormant segment and its already-open shard are
reused, no new shard")
}
// TestSyncChunkCallback_CreatePartHandler_StoresSegment drives the real
@@ -129,8 +128,7 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
warmup, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
- warmup.DecRef()
- warmup.DecRef()
+ warmup.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
baselineOpens := openShardCount.Load()
const groupName = "test-group"
@@ -179,8 +177,8 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
defer segProbe.DecRef()
_, err = segProbe.CreateTSTableIfNotExist(common.ShardID(0))
require.NoError(t, err)
- require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
- "REGRESSION: Close did not call segment.DecRef()")
+ require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+ "after Close the dormant segment and its already-open shard are
reused, no new shard")
}
func openTestTSDBForRefTest(t *testing.T, tmpPath string, openShardCount
*atomic.Int32) storage.TSDB[*tsTable, option] {
@@ -340,8 +338,7 @@ func TestSegmentCreateTS_ConsistencyAcrossPaths(t
*testing.T) {
historicalStart := ir.Standard(historicalRaw)
histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
require.NoError(t, err)
- histSeg.DecRef()
- histSeg.DecRef() // refCount->0, idle-closed
+ histSeg.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
// New raw datapoint with an off-grid (06:00 inside a 5-day bucket) ts.
// liaison-side sidx pre-standardizes it; data-node-side part chunk-sync
diff --git a/banyand/stream/write_data_segmentref_test.go
b/banyand/stream/write_data_segmentref_test.go
index 509d333b9..7b6863969 100644
--- a/banyand/stream/write_data_segmentref_test.go
+++ b/banyand/stream/write_data_segmentref_test.go
@@ -59,8 +59,7 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
seg, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
- seg.DecRef()
- seg.DecRef() // refCount -> 0 (idle-closed)
+ seg.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
baselineOpens := openShardCount.Load()
segA, err := db.CreateSegmentIfNotExist(segTime)
@@ -106,8 +105,8 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
defer segC.DecRef()
_, err = segC.CreateTSTableIfNotExist(common.ShardID(0))
require.NoError(t, err)
- require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
- "REGRESSION: Close did not call segment.DecRef()")
+ require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+ "after Close the dormant segment and its already-open shard are
reused, no new shard")
}
// TestSyncChunkCallback_CreatePartHandler_StoresSegment drives the real
@@ -132,8 +131,7 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
warmup, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
- warmup.DecRef()
- warmup.DecRef()
+ warmup.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
baselineOpens := openShardCount.Load()
const groupName = "test-group"
@@ -180,8 +178,8 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
defer segProbe.DecRef()
_, err = segProbe.CreateTSTableIfNotExist(common.ShardID(0))
require.NoError(t, err)
- require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
- "REGRESSION: Close did not call segment.DecRef()")
+ require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+ "after Close the dormant segment and its already-open shard are
reused, no new shard")
}
func openTestTSDBForRefTest(t *testing.T, tmpPath string, openShardCount
*atomic.Int32) storage.TSDB[*tsTable, option] {
@@ -315,8 +313,7 @@ func TestSegmentCreateTS_ConsistencyAcrossPaths(t
*testing.T) {
historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0,
time.Local))
histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
require.NoError(t, err)
- histSeg.DecRef()
- histSeg.DecRef()
+ histSeg.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
wantNewStart := ir.Standard(rawTS)
diff --git a/banyand/trace/write_data_segmentref_test.go
b/banyand/trace/write_data_segmentref_test.go
index d170bb3cd..6770b9c13 100644
--- a/banyand/trace/write_data_segmentref_test.go
+++ b/banyand/trace/write_data_segmentref_test.go
@@ -74,12 +74,12 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
- // Drive the segment into the cold-0 "idle-closed" state: segment object
- // still exists in sc.lst but refCount=0 and shards have been released.
+ // Drop the segment to refCount 0. Under the dormant-refcount model it
stays
+ // open (loaded), so re-acquiring it does NOT reload shards: the open
count
+ // stays flat across the sibling sync sessions below.
seg, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
- seg.DecRef() // normal caller release -> refCount = 1
- seg.DecRef() // simulate idle close -> refCount = 0
+ seg.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
baselineOpens := openShardCount.Load()
// ---------- Part A: simulate syncChunkCallback.CreatePartHandler
(fix) ----------
@@ -104,8 +104,8 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
opensAfterA := openShardCount.Load()
require.Equal(t, int32(1), opensAfterA-baselineOpens,
- "Part A's CreateSegmentIfNotExist should re-initialize exactly
once "+
- "(segment was idle-closed, so refCount=0 -> initialize
branch)")
+ "Part A creates shard 0 for the first time (the dormant segment
is reused "+
+ "without a reload, but its shard table is opened on
demand)")
// ---------- Part B: another sync session starts while A is still in
flight ----------
// With the fix, segA keeps refCount>=1, so Part B's
CreateSegmentIfNotExist
@@ -146,11 +146,9 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
require.Nil(t, partCtxB.segment,
"REGRESSION: Close must clear syncPartContext.segment
(`s.segment = nil`)")
- // ---------- Verify Close actually DecRef'd (not just cleared the
field) ----------
- // After both Closes, refCount should be 0. The next
CreateSegmentIfNotExist
- // must therefore re-initialize and invoke TSTableCreator. If Close
forgot
- // to call s.segment.DecRef(), refCount would still be >=1 and this
- // assertion would fail.
+ // ---------- After Close, the dormant segment is reused without reload
----------
+ // Both Closes drop refCount back to 0; the segment stays open
(dormant), so
+ // the next CreateSegmentIfNotExist re-acquires it without reloading
shards.
segC, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
defer segC.DecRef()
@@ -158,10 +156,8 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
require.NoError(t, err)
opensAfterC := openShardCount.Load()
- require.Equal(t, int32(1), opensAfterC-opensAfterB,
- "REGRESSION: after closing both partCtx, the next
CreateSegmentIfNotExist did "+
- "not re-initialize. This means Close did not call
segment.DecRef(). "+
- "Check `s.segment.DecRef()` is still in
syncPartContext.Close.")
+ require.Equal(t, int32(0), opensAfterC-opensAfterB,
+ "the dormant segment is reused without reload after Close")
}
func openTestTSDBForRefTest(t *testing.T, tmpPath string, openShardCount
*atomic.Int32) storage.TSDB[*tsTable, option] {
@@ -229,13 +225,12 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local)
- // Drive the segment into the idle-closed state (refCount=0) so that the
- // probe at the end of this test reliably exercises the `initialize`
- // branch of incRef if and only if Close actually DecRef'd.
+ // Drop the segment to refCount 0. Under the dormant-refcount model it
stays
+ // open (loaded), so subsequent CreateSegmentIfNotExist calls
re-acquire it
+ // without reloading shards.
warmup, err := db.CreateSegmentIfNotExist(segTime)
require.NoError(t, err)
- warmup.DecRef()
- warmup.DecRef() // refCount -> 0
+ warmup.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
baselineOpens := openShardCount.Load()
const groupName = "test-group"
@@ -265,11 +260,11 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
"partCtx struct literal and that `defer
segment.DecRef()` has not been "+
"re-introduced at the top of CreatePartHandler.")
- // At this point CreatePartHandler has driven refCount 0 -> 1 via
initialize
- // (loadShards ran -> TSTableCreator was called once).
+ // CreatePartHandler acquired the dormant segment (refCount 0 -> 1) and
+ // opened shard 0 on demand for the first time.
afterCreate := openShardCount.Load()
require.Equal(t, int32(1), afterCreate-baselineOpens,
- "CreatePartHandler should have re-initialized the idle-closed
segment once")
+ "CreatePartHandler opens shard 0 for the first time on the
dormant segment")
// CRITICAL mid-stream probe (between CreatePartHandler and Close).
// Under the fix, partCtx holds segment.refCount >= 1, so a concurrent
@@ -307,10 +302,8 @@ func
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
_, err = segProbe.CreateTSTableIfNotExist(common.ShardID(0))
require.NoError(t, err)
- require.Equal(t, int32(1), openShardCount.Load()-beforeProbe,
- "REGRESSION: after partCtx.Close, the next
CreateSegmentIfNotExist did not "+
- "re-initialize. This means Close did not call
s.segment.DecRef(). "+
- "Check that the Close method still contains
`s.segment.DecRef()`.")
+ require.Equal(t, int32(0), openShardCount.Load()-beforeProbe,
+ "after Close the dormant segment is reused without reload")
}
// newTestSchemaRepo builds a minimal *schemaRepo whose loadTSDB returns the
@@ -421,8 +414,7 @@ func TestSegmentCreateTS_ConsistencyAcrossPaths(t
*testing.T) {
historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0,
time.Local))
histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
require.NoError(t, err)
- histSeg.DecRef()
- histSeg.DecRef()
+ histSeg.DecRef() // the single create reference; refCount = 0 (dormant:
open, no active reference)
rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
wantNewStart := ir.Standard(rawTS)