This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 3d4539308 fix: release bluge index writers on segment rotation
(#13874) (#1128)
3d4539308 is described below
commit 3d45393089b97b108cc82e2b7188b4513aa807d7
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu May 21 14:41:49 2026 +0800
fix: release bluge index writers on segment rotation (#13874) (#1128)
---
CHANGES.md | 1 +
banyand/internal/storage/rotation.go | 13 +++++-
banyand/internal/storage/segment.go | 73 ++++++++++++++++++++++++++------
banyand/internal/storage/segment_test.go | 4 +-
banyand/internal/storage/tsdb_test.go | 28 ++++++------
banyand/measure/metadata.go | 4 +-
banyand/stream/metadata.go | 4 +-
banyand/trace/metadata.go | 4 +-
banyand/trace/metadata_test.go | 8 ++++
9 files changed, 106 insertions(+), 33 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 6d9c3f311..e998bf1b1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -92,6 +92,7 @@ Release Notes.
- Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling
`CATALOG_PROPERTY` groups.
- Fix lifecycle migration where the receiving node could create segments
shorter than the configured `SegmentInterval`.
- Fail fast on incompatible storage version at boot. Previously the server
would start in a degraded `SERVING` state with affected groups un-loaded
because the property schema-registry retry loop swallowed the
version-incompatibility panic. Compatible versions are listed in
`banyand/internal/storage/versions.yml`.
+- Release bluge index writers on segment rotation so `analysisWorker` pools
sized from `GOMAXPROCS` don't accumulate across rotations. Two layered defects
kept the existing idle-segment reclaim path from running: `segmentIdleTimeout`
defaulted to `0` (which disabled the 10-minute reclaim ticker), and `incRef`
refreshed `lastAccessed` on every rotation tick so `closeIdleSegments` never
observed an idle segment. Defaults to `time.Hour`, moves the `lastAccessed`
bump to real read/write call [...]
### Chores
diff --git a/banyand/internal/storage/rotation.go
b/banyand/internal/storage/rotation.go
index 04028ae22..2b680ddbb 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -57,15 +57,24 @@ func (d *database[T, O]) startRotationTask() error {
var idleCheckTicker *time.Ticker
var idleCheckC <-chan time.Time
- // Only create the ticker if idleTimeout is at least 1 second
+ // Only create the ticker if idleTimeout is at least 1 second.
+ // When disabled, idle segments are never reclaimed and their
+ // bluge writers (analysisWorker pools) accumulate across
rotations.
if d.segmentController.idleTimeout >= time.Second {
idleCheckTicker = time.NewTicker(10 * time.Minute)
idleCheckC = idleCheckTicker.C
+ d.logger.Info().
+ Stringer("idle_timeout",
d.segmentController.idleTimeout).
+ Msg("idle segment reclaimer enabled")
defer func() {
if idleCheckTicker != nil {
idleCheckTicker.Stop()
}
}()
+ } else {
+ d.logger.Warn().
+ Stringer("idle_timeout",
d.segmentController.idleTimeout).
+ Msg("idle segment reclaimer disabled
(idle_timeout < 1s); bluge writers will not be released on segment rotation")
}
for {
@@ -122,7 +131,7 @@ func (d *database[T, O]) startRotationTask() error {
case <-idleCheckC:
func() {
d.logger.Debug().Msg("checking for idle
segments")
- closedCount :=
d.segmentController.closeIdleSegments(taskCtx)
+ closedCount :=
d.segmentController.closeIdleSegments()
if closedCount > 0 {
d.logger.Info().Int("count",
closedCount).Msg("closed idle segments")
}
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 5204842a9..711bb62ac 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -164,6 +164,14 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T,
shardIDs []common.ShardID,
// guaranteed that the segment is alive (s.index != nil, shards loaded) and
// will stay alive until the matching DecRef.
//
+// incRef intentionally does NOT touch lastAccessed: the idle-segment
+// reclaimer (closeIdleSegments) decides eligibility from that field, and
+// the rotation-tick scan iterates every segment via incRef on each tick.
+// Refreshing lastAccessed here would defeat the reclaimer for any segment
+// outside the active write window. Callers that represent a real read or
+// write touch must bump lastAccessed explicitly (see selectSegments and
+// createSegment); housekeeping iterators must not.
+//
// The CAS loop closes a TOCTOU race that the older "Load + AddInt32"
// shape suffered from: a goroutine could read refCount=1, race against a
// concurrent DecRef that drives it to 0 and triggers performCleanup
@@ -179,7 +187,6 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T,
shardIDs []common.ShardID,
// DecRef that drives refCount to 0 sees no concurrent +1 sneak in
// before performCleanup runs.
func (s *segment[T, O]) incRef(ctx context.Context) error {
- s.lastAccessed.Store(time.Now().UnixNano())
for {
current := atomic.LoadInt32(&s.refCount)
if current <= 0 {
@@ -426,6 +433,7 @@ func (sc *segmentController[T, O]) selectSegments(timeRange
timestamp.TimeRange)
defer sc.RUnlock()
last := len(sc.lst) - 1
ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
+ now := time.Now().UnixNano()
for i := range sc.lst {
s := sc.lst[last-i]
if s.GetTimeRange().End.Before(timeRange.Start) {
@@ -435,6 +443,7 @@ func (sc *segmentController[T, O]) selectSegments(timeRange
timestamp.TimeRange)
if err = s.incRef(ctx); err != nil {
return nil, err
}
+ s.lastAccessed.Store(now)
tt = append(tt, s)
}
}
@@ -446,7 +455,11 @@ func (sc *segmentController[T, O]) createSegment(ts
time.Time) (*segment[T, O],
if err != nil {
return nil, err
}
- return s, s.incRef(context.WithValue(context.Background(),
logger.ContextKey, sc.l))
+ if err = s.incRef(context.WithValue(context.Background(),
logger.ContextKey, sc.l)); err != nil {
+ return nil, err
+ }
+ s.lastAccessed.Store(time.Now().UnixNano())
+ return s, nil
}
func (sc *segmentController[T, O]) segments(ctx context.Context, reopenClosed
bool) (ss []*segment[T, O], err error) {
@@ -469,26 +482,60 @@ func (sc *segmentController[T, O]) segments(ctx
context.Context, reopenClosed bo
return r, nil
}
-func (sc *segmentController[T, O]) closeIdleSegments(ctx context.Context) int {
+func (sc *segmentController[T, O]) closeIdleSegments() int {
maxIdleTime := sc.idleTimeout
now := time.Now().UnixNano()
idleThreshold := now - maxIdleTime.Nanoseconds()
- segs, _ := sc.segments(ctx, false)
- closedCount := 0
+ // Take our own snapshot rather than going through segments(false): that
+ // helper's "Load>0 then Add" bump is non-atomic and can resurrect a
+ // segment that DecRef just cleaned up, and its caller has no way to
tell
+ // which entries it actually bumped. closeIdleSegments must know,
because
+ // it issues an unconditional DecRef per entry to release the bump; if a
+ // concurrent selectSegments reopens an entry we didn't bump, that
DecRef
+ // would drop the query's only live ref and trigger performCleanup under
+ // active use. Iterate sc.lst under RLock and CAS-bump each open
segment;
+ // closed segments are recorded as "not bumped" and skipped in the loop.
+ sc.RLock()
+ segs := make([]*segment[T, O], 0, len(sc.lst))
+ bumped := make([]bool, 0, len(sc.lst))
+ refAtBump := make([]int32, 0, len(sc.lst))
+ for _, s := range sc.lst {
+ didBump := false
+ var snapRef int32
+ for {
+ current := atomic.LoadInt32(&s.refCount)
+ if current <= 0 {
+ break
+ }
+ if atomic.CompareAndSwapInt32(&s.refCount, current,
current+1) {
+ didBump = true
+ snapRef = current
+ break
+ }
+ }
+ segs = append(segs, s)
+ bumped = append(bumped, didBump)
+ refAtBump = append(refAtBump, snapRef)
+ }
+ sc.RUnlock()
- for _, seg := range segs {
- lastAccess := seg.lastAccessed.Load()
- // Only consider segments that have been idle for longer than
the threshold
- // and have active references (are not already closed)
- if lastAccess < idleThreshold &&
atomic.LoadInt32(&seg.refCount) > 0 {
- seg.DecRef()
+ closedCount := 0
+ for i, seg := range segs {
+ if !bumped[i] {
+ continue
}
- seg.DecRef()
- if atomic.LoadInt32(&seg.refCount) == 0 {
+ // Only close when the snapshot proved refCount==1 (baseline
only):
+ // a higher value means other callers hold active references,
and
+ // decrementing past our bump would steal one of their refs,
+ // potentially triggering performCleanup under active use on a
+ // subsequent tick.
+ if refAtBump[i] == 1 && seg.lastAccessed.Load() < idleThreshold
{
+ seg.DecRef()
closedCount++
}
+ seg.DecRef()
}
return closedCount
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index d203d1d7f..41fcfccf5 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -352,7 +352,7 @@ func TestCloseIdleAndSelectSegments(t *testing.T) {
seg2.lastAccessed.Store(time.Now().UnixNano())
// Close idle segments
- closedCount := sc.closeIdleSegments(context.Background())
+ closedCount := sc.closeIdleSegments()
// We should have closed 2 segments (seg1 and seg3)
assert.Equal(t, 2, closedCount)
@@ -611,7 +611,7 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t
*testing.T) {
segments[5].lastAccessed.Store(activeTime) // day6 - not expired
// Close idle segments
- closedCount := sc.closeIdleSegments(context.Background())
+ closedCount := sc.closeIdleSegments()
assert.Equal(t, 3, closedCount, "Should have closed 3 segments")
// Verify segments 0, 2, and 4 are closed
diff --git a/banyand/internal/storage/tsdb_test.go
b/banyand/internal/storage/tsdb_test.go
index a826e006f..76908953a 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -606,38 +606,40 @@ func TestCollectWithPartialClosedSegments(t *testing.T) {
var segments []Segment[*MockTSTable, any]
- // Create segments and keep track of them
+ // Create segments and keep track of them.
+ // openSegment.initialize sets refCount=1, then createSegment.incRef
+ // bumps it to 2. The DecRef below releases the createSegment ref,
+ // leaving refCount=1 (the initialize ref) so the idle reclaimer can
+ // CAS-bump and see refAtBump==1.
for _, date := range segmentDates {
mc.Set(date)
- seg, err := tsdb.CreateSegmentIfNotExist(date)
- require.NoError(t, err)
+ seg, segErr := tsdb.CreateSegmentIfNotExist(date)
+ require.NoError(t, segErr)
require.NotNil(t, seg)
segments = append(segments, seg)
- seg.DecRef() // Release reference
+ seg.DecRef() // Release createSegment's ref; refCount 2->1
}
// Set a specific tick time for testing
tickTime := int64(123456789)
db.latestTickTime.Store(tickTime)
- // Manually close some segments (first and third)
- segments[0].DecRef() // Close first segment
- segments[2].DecRef() // Close third segment
-
- // Manually set the closed segments to have idle time in the past
+ // Mark the first and third segments as idle by backdating their
+ // lastAccessed timestamp. Do NOT DecRef them — they must stay alive
+ // (refCount=1) so closeIdleSegments can CAS-bump (refAtBump==1) and
+ // close them.
sc := db.segmentController
ss, _ := sc.segments(context.Background(), false)
for _, s := range ss {
- // Find segments we want to mark as idle (first and third)
if s.Start.Equal(segmentDates[0]) ||
s.Start.Equal(segmentDates[2]) {
s.lastAccessed.Store(time.Now().Add(-2 *
time.Hour).UnixNano())
- s.DecRef() // Force close
}
s.DecRef() // Release our reference from segments()
}
- // Call closeIdleSegments to ensure our target segments are closed
- closedCount := sc.closeIdleSegments(context.Background())
+ // closeIdleSegments should close the 2 idle segments (refAtBump==1 and
+ // lastAccessed older than idleThreshold) and leave the other 2 open.
+ closedCount := sc.closeIdleSegments()
require.Equal(t, 2, closedCount, "Should have closed 2 segments")
// Verify segments 0 and 2 are closed, 1 and 3 are open
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 3763888fb..93e857a70 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -720,7 +720,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
shardNum := ro.ShardNum
ttl := ro.Ttl
segInterval := ro.SegmentInterval
- segmentIdleTimeout := time.Duration(0)
+ // Non-zero default so the idle-segment reclaimer ticker actually starts
+ // (storage/rotation.go gates it on >=1s). Staged Close paths override
below.
+ segmentIdleTimeout := time.Hour
disableRetention := false
disableRotation := false
if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 1cdb02945..99db76512 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -559,7 +559,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
shardNum := ro.ShardNum
ttl := ro.Ttl
segInterval := ro.SegmentInterval
- segmentIdleTimeout := time.Duration(0)
+ // Non-zero default so the idle-segment reclaimer ticker actually starts
+ // (storage/rotation.go gates it on >=1s). Staged Close paths override
below.
+ segmentIdleTimeout := time.Hour
disableRetention := false
disableRotation := false
if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index da9789a37..d2719e03c 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -581,7 +581,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(resourceSchema.DB, error
shardNum := ro.ShardNum
ttl := ro.Ttl
segInterval := ro.SegmentInterval
- segmentIdleTimeout := time.Duration(0)
+ // Non-zero default so the idle-segment reclaimer ticker actually starts
+ // (storage/rotation.go gates it on >=1s). Staged Close paths override
below.
+ segmentIdleTimeout := time.Hour
disableRetention := false
disableRotation := false
if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
diff --git a/banyand/trace/metadata_test.go b/banyand/trace/metadata_test.go
index dae19e9ba..98c58b34d 100644
--- a/banyand/trace/metadata_test.go
+++ b/banyand/trace/metadata_test.go
@@ -372,9 +372,17 @@ var _ = Describe("Metadata", func() {
Eventually(func() int64 {
return getFilePartCount(svcs, groupName)
},
flags.EventuallyTimeout).Should(BeNumerically(">=", 1))
+ filePartCountAfterFirstBatch :=
getFilePartCount(svcs, groupName)
changeTraceExtraTagType(svcs, traceName,
groupName)
writeSchemaChangeTraceData(svcs, traceName,
groupName, now.Add(-1*time.Hour), 3,
writeTraceDataOptions{extraTag:
extraTagString, traceIDPrefix: "trace_new_"})
+ // Wait for the second batch to flush to disk,
creating additional
+ // file parts that the merge loop can pick up.
Without this gate the
+ // merge Eventually below also has to absorb
flush latency, which can
+ // exceed 30 s on resource-constrained CI
runners with -race.
+ Eventually(func() int64 {
+ return getFilePartCount(svcs, groupName)
+ },
flags.EventuallyTimeout).Should(BeNumerically(">",
filePartCountAfterFirstBatch))
partCountBeforeMerge := getTotalPartCount(svcs,
groupName)
Eventually(func() int64 {
return getTotalPartCount(svcs,
groupName)