This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d4539308 fix: release bluge index writers on segment rotation 
(#13874) (#1128)
3d4539308 is described below

commit 3d45393089b97b108cc82e2b7188b4513aa807d7
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu May 21 14:41:49 2026 +0800

    fix: release bluge index writers on segment rotation (#13874) (#1128)
---
 CHANGES.md                               |  1 +
 banyand/internal/storage/rotation.go     | 13 +++++-
 banyand/internal/storage/segment.go      | 73 ++++++++++++++++++++++++++------
 banyand/internal/storage/segment_test.go |  4 +-
 banyand/internal/storage/tsdb_test.go    | 28 ++++++------
 banyand/measure/metadata.go              |  4 +-
 banyand/stream/metadata.go               |  4 +-
 banyand/trace/metadata.go                |  4 +-
 banyand/trace/metadata_test.go           |  8 ++++
 9 files changed, 106 insertions(+), 33 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6d9c3f311..e998bf1b1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -92,6 +92,7 @@ Release Notes.
 - Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling 
`CATALOG_PROPERTY` groups.
 - Fix lifecycle migration where the receiving node could create segments 
shorter than the configured `SegmentInterval`.
 - Fail fast on incompatible storage version at boot. Previously the server 
would start in a degraded `SERVING` state with affected groups un-loaded 
because the property schema-registry retry loop swallowed the 
version-incompatibility panic. Compatible versions are listed in 
`banyand/internal/storage/versions.yml`.
+- Release bluge index writers on segment rotation so `analysisWorker` pools 
sized from `GOMAXPROCS` don't accumulate across rotations. Two layered defects 
kept the existing idle-segment reclaim path from running: `segmentIdleTimeout` 
defaulted to `0` (which disabled the 10-minute reclaim ticker), and `incRef` 
refreshed `lastAccessed` on every rotation tick so `closeIdleSegments` never 
observed an idle segment. Defaults to `time.Hour`, moves the `lastAccessed` 
bump to real read/write call [...]
 
 ### Chores
 
diff --git a/banyand/internal/storage/rotation.go 
b/banyand/internal/storage/rotation.go
index 04028ae22..2b680ddbb 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -57,15 +57,24 @@ func (d *database[T, O]) startRotationTask() error {
                var idleCheckTicker *time.Ticker
                var idleCheckC <-chan time.Time
 
-               // Only create the ticker if idleTimeout is at least 1 second
+               // Only create the ticker if idleTimeout is at least 1 second.
+               // When disabled, idle segments are never reclaimed and their
+               // bluge writers (analysisWorker pools) accumulate across 
rotations.
                if d.segmentController.idleTimeout >= time.Second {
                        idleCheckTicker = time.NewTicker(10 * time.Minute)
                        idleCheckC = idleCheckTicker.C
+                       d.logger.Info().
+                               Stringer("idle_timeout", 
d.segmentController.idleTimeout).
+                               Msg("idle segment reclaimer enabled")
                        defer func() {
                                if idleCheckTicker != nil {
                                        idleCheckTicker.Stop()
                                }
                        }()
+               } else {
+                       d.logger.Warn().
+                               Stringer("idle_timeout", 
d.segmentController.idleTimeout).
+                               Msg("idle segment reclaimer disabled 
(idle_timeout < 1s); bluge writers will not be released on segment rotation")
                }
 
                for {
@@ -122,7 +131,7 @@ func (d *database[T, O]) startRotationTask() error {
                        case <-idleCheckC:
                                func() {
                                        d.logger.Debug().Msg("checking for idle 
segments")
-                                       closedCount := 
d.segmentController.closeIdleSegments(taskCtx)
+                                       closedCount := 
d.segmentController.closeIdleSegments()
                                        if closedCount > 0 {
                                                d.logger.Info().Int("count", 
closedCount).Msg("closed idle segments")
                                        }
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 5204842a9..711bb62ac 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -164,6 +164,14 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T, 
shardIDs []common.ShardID,
 // guaranteed that the segment is alive (s.index != nil, shards loaded) and
 // will stay alive until the matching DecRef.
 //
+// incRef intentionally does NOT touch lastAccessed: the idle-segment
+// reclaimer (closeIdleSegments) decides eligibility from that field, and
+// the rotation-tick scan iterates every segment via incRef on each tick.
+// Refreshing lastAccessed here would defeat the reclaimer for any segment
+// outside the active write window. Callers that represent a real read or
+// write touch must bump lastAccessed explicitly (see selectSegments and
+// createSegment); housekeeping iterators must not.
+//
 // The CAS loop closes a TOCTOU race that the older "Load + AddInt32"
 // shape suffered from: a goroutine could read refCount=1, race against a
 // concurrent DecRef that drives it to 0 and triggers performCleanup
@@ -179,7 +187,6 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T, 
shardIDs []common.ShardID,
 // DecRef that drives refCount to 0 sees no concurrent +1 sneak in
 // before performCleanup runs.
 func (s *segment[T, O]) incRef(ctx context.Context) error {
-       s.lastAccessed.Store(time.Now().UnixNano())
        for {
                current := atomic.LoadInt32(&s.refCount)
                if current <= 0 {
@@ -426,6 +433,7 @@ func (sc *segmentController[T, O]) selectSegments(timeRange 
timestamp.TimeRange)
        defer sc.RUnlock()
        last := len(sc.lst) - 1
        ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
+       now := time.Now().UnixNano()
        for i := range sc.lst {
                s := sc.lst[last-i]
                if s.GetTimeRange().End.Before(timeRange.Start) {
@@ -435,6 +443,7 @@ func (sc *segmentController[T, O]) selectSegments(timeRange 
timestamp.TimeRange)
                        if err = s.incRef(ctx); err != nil {
                                return nil, err
                        }
+                       s.lastAccessed.Store(now)
                        tt = append(tt, s)
                }
        }
@@ -446,7 +455,11 @@ func (sc *segmentController[T, O]) createSegment(ts 
time.Time) (*segment[T, O],
        if err != nil {
                return nil, err
        }
-       return s, s.incRef(context.WithValue(context.Background(), 
logger.ContextKey, sc.l))
+       if err = s.incRef(context.WithValue(context.Background(), 
logger.ContextKey, sc.l)); err != nil {
+               return nil, err
+       }
+       s.lastAccessed.Store(time.Now().UnixNano())
+       return s, nil
 }
 
 func (sc *segmentController[T, O]) segments(ctx context.Context, reopenClosed 
bool) (ss []*segment[T, O], err error) {
@@ -469,26 +482,60 @@ func (sc *segmentController[T, O]) segments(ctx 
context.Context, reopenClosed bo
        return r, nil
 }
 
-func (sc *segmentController[T, O]) closeIdleSegments(ctx context.Context) int {
+func (sc *segmentController[T, O]) closeIdleSegments() int {
        maxIdleTime := sc.idleTimeout
 
        now := time.Now().UnixNano()
        idleThreshold := now - maxIdleTime.Nanoseconds()
 
-       segs, _ := sc.segments(ctx, false)
-       closedCount := 0
+       // Take our own snapshot rather than going through segments(false): that
+       // helper's "Load>0 then Add" bump is non-atomic and can resurrect a
+       // segment that DecRef just cleaned up, and its caller has no way to 
tell
+       // which entries it actually bumped. closeIdleSegments must know, 
because
+       // it issues an unconditional DecRef per entry to release the bump; if a
+       // concurrent selectSegments reopens an entry we didn't bump, that 
DecRef
+       // would drop the query's only live ref and trigger performCleanup under
+       // active use. Iterate sc.lst under RLock and CAS-bump each open 
segment;
+       // closed segments are recorded as "not bumped" and skipped in the loop.
+       sc.RLock()
+       segs := make([]*segment[T, O], 0, len(sc.lst))
+       bumped := make([]bool, 0, len(sc.lst))
+       refAtBump := make([]int32, 0, len(sc.lst))
+       for _, s := range sc.lst {
+               didBump := false
+               var snapRef int32
+               for {
+                       current := atomic.LoadInt32(&s.refCount)
+                       if current <= 0 {
+                               break
+                       }
+                       if atomic.CompareAndSwapInt32(&s.refCount, current, 
current+1) {
+                               didBump = true
+                               snapRef = current
+                               break
+                       }
+               }
+               segs = append(segs, s)
+               bumped = append(bumped, didBump)
+               refAtBump = append(refAtBump, snapRef)
+       }
+       sc.RUnlock()
 
-       for _, seg := range segs {
-               lastAccess := seg.lastAccessed.Load()
-               // Only consider segments that have been idle for longer than 
the threshold
-               // and have active references (are not already closed)
-               if lastAccess < idleThreshold && 
atomic.LoadInt32(&seg.refCount) > 0 {
-                       seg.DecRef()
+       closedCount := 0
+       for i, seg := range segs {
+               if !bumped[i] {
+                       continue
                }
-               seg.DecRef()
-               if atomic.LoadInt32(&seg.refCount) == 0 {
+               // Only close when the snapshot proved refCount==1 (baseline 
only):
+               // a higher value means other callers hold active references, 
and
+               // decrementing past our bump would steal one of their refs,
+               // potentially triggering performCleanup under active use on a
+               // subsequent tick.
+               if refAtBump[i] == 1 && seg.lastAccessed.Load() < idleThreshold 
{
+                       seg.DecRef()
                        closedCount++
                }
+               seg.DecRef()
        }
 
        return closedCount
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index d203d1d7f..41fcfccf5 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -352,7 +352,7 @@ func TestCloseIdleAndSelectSegments(t *testing.T) {
        seg2.lastAccessed.Store(time.Now().UnixNano())
 
        // Close idle segments
-       closedCount := sc.closeIdleSegments(context.Background())
+       closedCount := sc.closeIdleSegments()
 
        // We should have closed 2 segments (seg1 and seg3)
        assert.Equal(t, 2, closedCount)
@@ -611,7 +611,7 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t 
*testing.T) {
        segments[5].lastAccessed.Store(activeTime) // day6 - not expired
 
        // Close idle segments
-       closedCount := sc.closeIdleSegments(context.Background())
+       closedCount := sc.closeIdleSegments()
        assert.Equal(t, 3, closedCount, "Should have closed 3 segments")
 
        // Verify segments 0, 2, and 4 are closed
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
index a826e006f..76908953a 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -606,38 +606,40 @@ func TestCollectWithPartialClosedSegments(t *testing.T) {
 
        var segments []Segment[*MockTSTable, any]
 
-       // Create segments and keep track of them
+       // Create segments and keep track of them.
+       // openSegment.initialize sets refCount=1, then createSegment.incRef
+       // bumps it to 2. The DecRef below releases the createSegment ref,
+       // leaving refCount=1 (the initialize ref) so the idle reclaimer can
+       // CAS-bump and see refAtBump==1.
        for _, date := range segmentDates {
                mc.Set(date)
-               seg, err := tsdb.CreateSegmentIfNotExist(date)
-               require.NoError(t, err)
+               seg, segErr := tsdb.CreateSegmentIfNotExist(date)
+               require.NoError(t, segErr)
                require.NotNil(t, seg)
                segments = append(segments, seg)
-               seg.DecRef() // Release reference
+               seg.DecRef() // Release createSegment's ref; refCount 2->1
        }
 
        // Set a specific tick time for testing
        tickTime := int64(123456789)
        db.latestTickTime.Store(tickTime)
 
-       // Manually close some segments (first and third)
-       segments[0].DecRef() // Close first segment
-       segments[2].DecRef() // Close third segment
-
-       // Manually set the closed segments to have idle time in the past
+       // Mark the first and third segments as idle by backdating their
+       // lastAccessed timestamp. Do NOT DecRef them — they must stay alive
+       // (refCount=1) so closeIdleSegments can CAS-bump (refAtBump==1) and
+       // close them.
        sc := db.segmentController
        ss, _ := sc.segments(context.Background(), false)
        for _, s := range ss {
-               // Find segments we want to mark as idle (first and third)
                if s.Start.Equal(segmentDates[0]) || 
s.Start.Equal(segmentDates[2]) {
                        s.lastAccessed.Store(time.Now().Add(-2 * 
time.Hour).UnixNano())
-                       s.DecRef() // Force close
                }
                s.DecRef() // Release our reference from segments()
        }
 
-       // Call closeIdleSegments to ensure our target segments are closed
-       closedCount := sc.closeIdleSegments(context.Background())
+       // closeIdleSegments should close the 2 idle segments (refAtBump==1 and
+       // lastAccessed older than idleThreshold) and leave the other 2 open.
+       closedCount := sc.closeIdleSegments()
        require.Equal(t, 2, closedCount, "Should have closed 2 segments")
 
        // Verify segments 0 and 2 are closed, 1 and 3 are open
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 3763888fb..93e857a70 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -720,7 +720,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
        shardNum := ro.ShardNum
        ttl := ro.Ttl
        segInterval := ro.SegmentInterval
-       segmentIdleTimeout := time.Duration(0)
+       // Non-zero default so the idle-segment reclaimer ticker actually starts
+       // (storage/rotation.go gates it on >=1s). Staged Close paths override 
below.
+       segmentIdleTimeout := time.Hour
        disableRetention := false
        disableRotation := false
        if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 1cdb02945..99db76512 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -559,7 +559,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
        shardNum := ro.ShardNum
        ttl := ro.Ttl
        segInterval := ro.SegmentInterval
-       segmentIdleTimeout := time.Duration(0)
+       // Non-zero default so the idle-segment reclaimer ticker actually starts
+       // (storage/rotation.go gates it on >=1s). Staged Close paths override 
below.
+       segmentIdleTimeout := time.Hour
        disableRetention := false
        disableRotation := false
        if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index da9789a37..d2719e03c 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -581,7 +581,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error
        shardNum := ro.ShardNum
        ttl := ro.Ttl
        segInterval := ro.SegmentInterval
-       segmentIdleTimeout := time.Duration(0)
+       // Non-zero default so the idle-segment reclaimer ticker actually starts
+       // (storage/rotation.go gates it on >=1s). Staged Close paths override 
below.
+       segmentIdleTimeout := time.Hour
        disableRetention := false
        disableRotation := false
        if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
diff --git a/banyand/trace/metadata_test.go b/banyand/trace/metadata_test.go
index dae19e9ba..98c58b34d 100644
--- a/banyand/trace/metadata_test.go
+++ b/banyand/trace/metadata_test.go
@@ -372,9 +372,17 @@ var _ = Describe("Metadata", func() {
                                Eventually(func() int64 {
                                        return getFilePartCount(svcs, groupName)
                                }, 
flags.EventuallyTimeout).Should(BeNumerically(">=", 1))
+                               filePartCountAfterFirstBatch := 
getFilePartCount(svcs, groupName)
                                changeTraceExtraTagType(svcs, traceName, 
groupName)
                                writeSchemaChangeTraceData(svcs, traceName, 
groupName, now.Add(-1*time.Hour), 3,
                                        writeTraceDataOptions{extraTag: 
extraTagString, traceIDPrefix: "trace_new_"})
+                               // Wait for the second batch to flush to disk, 
creating additional
+                               // file parts that the merge loop can pick up. 
Without this gate the
+                               // merge Eventually below also has to absorb 
flush latency, which can
+                               // exceed 30 s on resource-constrained CI 
runners with -race.
+                               Eventually(func() int64 {
+                                       return getFilePartCount(svcs, groupName)
+                               }, 
flags.EventuallyTimeout).Should(BeNumerically(">", 
filePartCountAfterFirstBatch))
                                partCountBeforeMerge := getTotalPartCount(svcs, 
groupName)
                                Eventually(func() int64 {
                                        return getTotalPartCount(svcs, 
groupName)

Reply via email to