Copilot commented on code in PR #1128:
URL:
https://github.com/apache/skywalking-banyandb/pull/1128#discussion_r3244522452
##########
banyand/internal/storage/segment.go:
##########
@@ -463,23 +476,49 @@ func (sc *segmentController[T, O]) segments(ctx
context.Context, reopenClosed bo
return r, nil
}
-func (sc *segmentController[T, O]) closeIdleSegments(ctx context.Context) int {
+func (sc *segmentController[T, O]) closeIdleSegments() int {
maxIdleTime := sc.idleTimeout
now := time.Now().UnixNano()
idleThreshold := now - maxIdleTime.Nanoseconds()
- segs, _ := sc.segments(ctx, false)
- closedCount := 0
+ // Take our own snapshot rather than going through segments(false): that
+ // helper's "Load>0 then Add" bump is non-atomic and can resurrect a
+ // segment that DecRef just cleaned up, and its caller has no way to
tell
+ // which entries it actually bumped. closeIdleSegments must know,
because
+ // it issues an unconditional DecRef per entry to release the bump; if a
+ // concurrent selectSegments reopens an entry we didn't bump, that
DecRef
+ // would drop the query's only live ref and trigger performCleanup under
+ // active use. Iterate sc.lst under RLock and CAS-bump each open
segment;
+ // closed segments are recorded as "not bumped" and skipped in the loop.
+ sc.RLock()
+ segs := make([]*segment[T, O], 0, len(sc.lst))
+ bumped := make([]bool, 0, len(sc.lst))
+ for _, s := range sc.lst {
+ didBump := false
+ for {
+ current := atomic.LoadInt32(&s.refCount)
+ if current <= 0 {
+ break
+ }
+ if atomic.CompareAndSwapInt32(&s.refCount, current,
current+1) {
+ didBump = true
+ break
+ }
+ }
+ segs = append(segs, s)
+ bumped = append(bumped, didBump)
+ }
+ sc.RUnlock()
- for _, seg := range segs {
- lastAccess := seg.lastAccessed.Load()
- // Only consider segments that have been idle for longer than
the threshold
- // and have active references (are not already closed)
- if lastAccess < idleThreshold &&
atomic.LoadInt32(&seg.refCount) > 0 {
+ closedCount := 0
+ for i, seg := range segs {
+ if bumped[i] {
+ if seg.lastAccessed.Load() < idleThreshold {
+ seg.DecRef()
Review Comment:
In closeIdleSegments(), the extra `seg.DecRef()` used to “close” an idle
segment is unconditional with respect to how many non-reclaimer refs exist.
This can lead to a refcount invariant break across ticks: if a segment is idle
but currently has active refs, the first reclaimer tick can drop the baseline
ref (leaving only the active ref). A subsequent tick can then decrement the
active ref to 0 (because it still looks idle), triggering performCleanup while
the active user is still operating on the segment. Consider only attempting the
“close” decrement when you can prove the segment is unused (e.g., only baseline
ref is present), and otherwise only release the reclaimer’s temporary bump.
##########
banyand/internal/storage/rotation.go:
##########
@@ -122,7 +131,7 @@ func (d *database[T, O]) startRotationTask() error {
case <-idleCheckC:
func() {
d.logger.Debug().Msg("checking for idle
segments")
- closedCount :=
d.segmentController.closeIdleSegments(taskCtx)
+ closedCount :=
d.segmentController.closeIdleSegments()
if closedCount > 0 {
d.logger.Info().Int("count",
closedCount).Msg("closed idle segments")
Review Comment:
`closeIdleSegments()` returns the number of segments whose refCount is 0
after the scan (i.e., it can include segments that were already closed before
this tick). Logging this value as "closed idle segments" can be misleading and
may produce repeated Info logs every 10 minutes once any segment is closed.
Either adjust closeIdleSegments to return the number closed in *this* run, or
change the log message/level to reflect that it’s reporting total
currently-closed segments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]