This is an automated email from the ASF dual-hosted git repository. ButterBright pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b2ced812aad0d15afc0e021a5d3e8535f4305566 Author: Gao Hongtao <[email protected]> AuthorDate: Thu May 21 14:41:49 2026 +0800 fix: release bluge index writers on segment rotation (#13874) (#1128) --- CHANGES.md | 1 + banyand/internal/storage/rotation.go | 11 +++++- banyand/internal/storage/segment.go | 71 +++++++++++++++++++++++++++++------ banyand/internal/storage/tsdb_test.go | 26 +++++++------ banyand/measure/metadata.go | 4 +- banyand/stream/metadata.go | 4 +- banyand/trace/metadata.go | 4 +- banyand/trace/metadata_test.go | 8 ++++ 8 files changed, 101 insertions(+), 28 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index fe42fa088..8a64b558d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -40,6 +40,7 @@ Release Notes. - Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling `CATALOG_PROPERTY` groups. - Fix lifecycle migration where the receiving node could create segments shorter than the configured `SegmentInterval`. - Fail fast on incompatible storage version at boot. Previously the server would start in a degraded `SERVING` state with affected groups un-loaded because the property schema-registry retry loop swallowed the version-incompatibility panic. Compatible versions are listed in `banyand/internal/storage/versions.yml`. +- Release bluge index writers on segment rotation so `analysisWorker` pools sized from `GOMAXPROCS` don't accumulate across rotations. Two layered defects kept the existing idle-segment reclaim path from running: `segmentIdleTimeout` defaulted to `0` (which disabled the 10-minute reclaim ticker), and `incRef` refreshed `lastAccessed` on every rotation tick so `closeIdleSegments` never observed an idle segment. Defaults to `time.Hour`, moves the `lastAccessed` bump to real read/write call [...] ### Chores diff --git a/banyand/internal/storage/rotation.go b/banyand/internal/storage/rotation.go index 3923cc99c..b97e80e00 100644 --- a/banyand/internal/storage/rotation.go +++ b/banyand/internal/storage/rotation.go @@ -55,15 +55,24 @@ func (d *database[T, O]) startRotationTask() error { var idleCheckTicker *time.Ticker var idleCheckC <-chan time.Time - // Only create the ticker if idleTimeout is at least 1 second + // Only create the ticker if idleTimeout is at least 1 second. + // When disabled, idle segments are never reclaimed and their + // bluge writers (analysisWorker pools) accumulate across rotations. if d.segmentController.idleTimeout >= time.Second { idleCheckTicker = time.NewTicker(10 * time.Minute) idleCheckC = idleCheckTicker.C + d.logger.Info(). + Stringer("idle_timeout", d.segmentController.idleTimeout). + Msg("idle segment reclaimer enabled") defer func() { if idleCheckTicker != nil { idleCheckTicker.Stop() } }() + } else { + d.logger.Warn(). + Stringer("idle_timeout", d.segmentController.idleTimeout). + Msg("idle segment reclaimer disabled (idle_timeout < 1s); bluge writers will not be released on segment rotation") } for { diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 75a2994b0..e0a9ab787 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -162,6 +162,14 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T, shardIDs []common.ShardID, // guaranteed that the segment is alive (s.index != nil, shards loaded) and // will stay alive until the matching DecRef. // +// incRef intentionally does NOT touch lastAccessed: the idle-segment +// reclaimer (closeIdleSegments) decides eligibility from that field, and +// the rotation-tick scan iterates every segment via incRef on each tick. +// Refreshing lastAccessed here would defeat the reclaimer for any segment +// outside the active write window. Callers that represent a real read or +// write touch must bump lastAccessed explicitly (see selectSegments and +// createSegment); housekeeping iterators must not. +// // The CAS loop closes a TOCTOU race that the older "Load + AddInt32" // shape suffered from: a goroutine could read refCount=1, race against a // concurrent DecRef that drives it to 0 and triggers performCleanup @@ -177,7 +185,6 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T, shardIDs []common.ShardID, // DecRef that drives refCount to 0 sees no concurrent +1 sneak in // before performCleanup runs. func (s *segment[T, O]) incRef(ctx context.Context) error { - s.lastAccessed.Store(time.Now().UnixNano()) for { current := atomic.LoadInt32(&s.refCount) if current <= 0 { @@ -424,6 +431,7 @@ func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange) defer sc.RUnlock() last := len(sc.lst) - 1 ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l) + now := time.Now().UnixNano() for i := range sc.lst { s := sc.lst[last-i] if s.GetTimeRange().End.Before(timeRange.Start) { @@ -433,6 +441,7 @@ func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange) if err = s.incRef(ctx); err != nil { return nil, err } + s.lastAccessed.Store(now) tt = append(tt, s) } } @@ -444,7 +453,11 @@ func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, O], if err != nil { return nil, err } - return s, s.incRef(context.WithValue(context.Background(), logger.ContextKey, sc.l)) + if err = s.incRef(context.WithValue(context.Background(), logger.ContextKey, sc.l)); err != nil { + return nil, err + } + s.lastAccessed.Store(time.Now().UnixNano()) + return s, nil } func (sc *segmentController[T, O]) segments(reopenClosed bool) (ss []*segment[T, O], err error) { @@ -473,20 +486,54 @@ func (sc *segmentController[T, O]) closeIdleSegments() int { now := time.Now().UnixNano() idleThreshold := now - maxIdleTime.Nanoseconds() - segs, _ := sc.segments(false) - closedCount := 0 + // Take our own snapshot rather than going through segments(false): that + // helper's "Load>0 then Add" bump is non-atomic and can resurrect a + // segment that DecRef just cleaned up, and its caller has no way to tell + // which entries it actually bumped. closeIdleSegments must know, because + // it issues an unconditional DecRef per entry to release the bump; if a + // concurrent selectSegments reopens an entry we didn't bump, that DecRef + // would drop the query's only live ref and trigger performCleanup under + // active use. Iterate sc.lst under RLock and CAS-bump each open segment; + // closed segments are recorded as "not bumped" and skipped in the loop. + sc.RLock() + segs := make([]*segment[T, O], 0, len(sc.lst)) + bumped := make([]bool, 0, len(sc.lst)) + refAtBump := make([]int32, 0, len(sc.lst)) + for _, s := range sc.lst { + didBump := false + var snapRef int32 + for { + current := atomic.LoadInt32(&s.refCount) + if current <= 0 { + break + } + if atomic.CompareAndSwapInt32(&s.refCount, current, current+1) { + didBump = true + snapRef = current + break + } + } + segs = append(segs, s) + bumped = append(bumped, didBump) + refAtBump = append(refAtBump, snapRef) + } + sc.RUnlock() - for _, seg := range segs { - lastAccess := seg.lastAccessed.Load() - // Only consider segments that have been idle for longer than the threshold - // and have active references (are not already closed) - if lastAccess < idleThreshold && atomic.LoadInt32(&seg.refCount) > 0 { - seg.DecRef() + closedCount := 0 + for i, seg := range segs { + if !bumped[i] { + continue } - seg.DecRef() - if atomic.LoadInt32(&seg.refCount) == 0 { + // Only close when the snapshot proved refCount==1 (baseline only): + // a higher value means other callers hold active references, and + // decrementing past our bump would steal one of their refs, + // potentially triggering performCleanup under active use on a + // subsequent tick. + if refAtBump[i] == 1 && seg.lastAccessed.Load() < idleThreshold { + seg.DecRef() closedCount++ } + seg.DecRef() } return closedCount diff --git a/banyand/internal/storage/tsdb_test.go b/banyand/internal/storage/tsdb_test.go index 4852b9bd6..9b97d4ca2 100644 --- a/banyand/internal/storage/tsdb_test.go +++ b/banyand/internal/storage/tsdb_test.go @@ -606,37 +606,39 @@ func TestCollectWithPartialClosedSegments(t *testing.T) { var segments []Segment[*MockTSTable, any] - // Create segments and keep track of them + // Create segments and keep track of them. + // openSegment.initialize sets refCount=1, then createSegment.incRef + // bumps it to 2. The DecRef below releases the createSegment ref, + // leaving refCount=1 (the initialize ref) so the idle reclaimer can + // CAS-bump and see refAtBump==1. for _, date := range segmentDates { mc.Set(date) - seg, err := tsdb.CreateSegmentIfNotExist(date) - require.NoError(t, err) + seg, segErr := tsdb.CreateSegmentIfNotExist(date) + require.NoError(t, segErr) require.NotNil(t, seg) segments = append(segments, seg) - seg.DecRef() // Release reference + seg.DecRef() // Release createSegment's ref; refCount 2->1 } // Set a specific tick time for testing tickTime := int64(123456789) db.latestTickTime.Store(tickTime) - // Manually close some segments (first and third) - segments[0].DecRef() // Close first segment - segments[2].DecRef() // Close third segment - - // Manually set the closed segments to have idle time in the past + // Mark the first and third segments as idle by backdating their + // lastAccessed timestamp. Do NOT DecRef them — they must stay alive + // (refCount=1) so closeIdleSegments can CAS-bump (refAtBump==1) and + // close them. sc := db.segmentController ss, _ := sc.segments(false) for _, s := range ss { - // Find segments we want to mark as idle (first and third) if s.Start.Equal(segmentDates[0]) || s.Start.Equal(segmentDates[2]) { s.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano()) - s.DecRef() // Force close } s.DecRef() // Release our reference from segments() } - // Call closeIdleSegments to ensure our target segments are closed + // closeIdleSegments should close the 2 idle segments (refAtBump==1 and + // lastAccessed older than idleThreshold) and leave the other 2 open. closedCount := sc.closeIdleSegments() require.Equal(t, 2, closedCount, "Should have closed 2 segments") diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index d206cd342..f8ea68692 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -680,7 +680,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error shardNum := ro.ShardNum ttl := ro.Ttl segInterval := ro.SegmentInterval - segmentIdleTimeout := time.Duration(0) + // Non-zero default so the idle-segment reclaimer ticker actually starts + // (storage/rotation.go gates it on >=1s). Staged Close paths override below. + segmentIdleTimeout := time.Hour disableRetention := false disableRotation := false if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 { diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index dd91f1481..bc48c00c9 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -537,7 +537,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error shardNum := ro.ShardNum ttl := ro.Ttl segInterval := ro.SegmentInterval - segmentIdleTimeout := time.Duration(0) + // Non-zero default so the idle-segment reclaimer ticker actually starts + // (storage/rotation.go gates it on >=1s). Staged Close paths override below. + segmentIdleTimeout := time.Hour disableRetention := false disableRotation := false if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 { diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go index 3b02231b5..60bc4731d 100644 --- a/banyand/trace/metadata.go +++ b/banyand/trace/metadata.go @@ -560,7 +560,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error shardNum := ro.ShardNum ttl := ro.Ttl segInterval := ro.SegmentInterval - segmentIdleTimeout := time.Duration(0) + // Non-zero default so the idle-segment reclaimer ticker actually starts + // (storage/rotation.go gates it on >=1s). Staged Close paths override below. + segmentIdleTimeout := time.Hour disableRetention := false disableRotation := false if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 { diff --git a/banyand/trace/metadata_test.go b/banyand/trace/metadata_test.go index 58994f525..44f18ac51 100644 --- a/banyand/trace/metadata_test.go +++ b/banyand/trace/metadata_test.go @@ -372,9 +372,17 @@ var _ = Describe("Metadata", func() { Eventually(func() int64 { return getFilePartCount(svcs, groupName) }, flags.EventuallyTimeout).Should(BeNumerically(">=", 1)) + filePartCountAfterFirstBatch := getFilePartCount(svcs, groupName) changeTraceExtraTagType(svcs, traceName, groupName) writeSchemaChangeTraceData(svcs, traceName, groupName, now.Add(-1*time.Hour), 3, writeTraceDataOptions{extraTag: extraTagString, traceIDPrefix: "trace_new_"}) + // Wait for the second batch to flush to disk, creating additional + // file parts that the merge loop can pick up. Without this gate the + // merge Eventually below also has to absorb flush latency, which can + // exceed 30 s on resource-constrained CI runners with -race. + Eventually(func() int64 { + return getFilePartCount(svcs, groupName) + }, flags.EventuallyTimeout).Should(BeNumerically(">", filePartCountAfterFirstBatch)) partCountBeforeMerge := getTotalPartCount(svcs, groupName) Eventually(func() int64 { return getTotalPartCount(svcs, groupName)
