This is an automated email from the ASF dual-hosted git repository.
mrproliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b8168b76a Fix fetch segment nil pointer in cold stage and propagation
the error when inspect the group (#1115)
b8168b76a is described below
commit b8168b76ab7c11de7d1603f3ebaac6cf5b1148a7
Author: mrproliu <[email protected]>
AuthorDate: Thu May 7 11:23:26 2026 +0800
Fix fetch segment nil pointer in cold stage and propagation the error when
inspect the group (#1115)
---
CHANGES.md | 2 +
api/proto/banyandb/fodc/v1/rpc.proto | 11 +
banyand/internal/storage/index.go | 49 +++-
banyand/internal/storage/segment.go | 42 ++-
banyand/internal/storage/segment_test.go | 318 +++++++++++++++++++++
banyand/liaison/grpc/deletion.go | 14 +-
banyand/liaison/grpc/deletion_test.go | 28 +-
banyand/liaison/grpc/registry.go | 9 +-
banyand/measure/metadata_internal_test.go | 48 ++++
banyand/metadata/client.go | 2 +-
banyand/metadata/metadata.go | 2 +-
banyand/metadata/schema/collector.go | 46 ++-
banyand/queue/sub/group_lifecycle.go | 14 +-
.../queue/sub/group_lifecycle_cold_tier_test.go | 298 +++++++++++++++++++
banyand/queue/sub/group_lifecycle_test.go | 116 +++++++-
banyand/stream/metadata_internal_test.go | 48 ++++
banyand/trace/metadata_internal_test.go | 48 ++++
docs/api-reference.md | 1 +
fodc/proxy/internal/lifecycle/manager.go | 28 +-
fodc/proxy/internal/lifecycle/manager_test.go | 61 ++++
20 files changed, 1146 insertions(+), 39 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1a8a81e63..0f7457ef8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,8 @@ Release Notes.
- Fix FODC lifecycle cache poisoning where transient `InspectAll` failures
were cached for 10 minutes and masked liaison recovery; raise FODC agent and
proxy timeouts from 10s to 40s.
- Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g.
`replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to
`protojson` so all fields are emitted (nil nested messages serialize as `null`).
- Fix trace `block_writer` panic on out-of-order timestamps within the same
traceID, which dropped one trace-write batch per panic in multi-agent
SkyWalking deployments. Spans of a single trace originate from
independently-clocked services, and trace storage is organized by traceID
rather than timestamp, so per-traceID timestamp monotonicity is not a writer
invariant.
+- Fix nil-pointer panic on cold-tier data nodes when FODC `InspectAll` raced
with idle-segment cleanup.
+- Add `GroupLifecycleInfo.errors` to surface per-group collection failures
from FODC `InspectAll` instead of silently dropping the affected node entry.
### Chores
diff --git a/api/proto/banyandb/fodc/v1/rpc.proto
b/api/proto/banyandb/fodc/v1/rpc.proto
index dd7a5c67a..9315da8a6 100644
--- a/api/proto/banyandb/fodc/v1/rpc.proto
+++ b/api/proto/banyandb/fodc/v1/rpc.proto
@@ -114,6 +114,17 @@ message GroupLifecycleInfo {
string catalog = 2;
banyandb.common.v1.ResourceOpts resource_opts = 3;
repeated banyandb.database.v1.DataInfo data_info = 4;
+ // errors lists every failure observed while collecting data_info for this
+ // group: top-level CollectDataInfo failures (GetGroup, missing collector,
+ // dial failure -- prefixed "top-level: ") and per-node broadcast failures
+ // (prefixed "future error: ", "node error: ", "broadcast failed: ").
+ // Combined with len(data_info), consumers can tell the following four
+ // states apart:
+ // - data_info empty && errors empty -> no nodes reported (group
inactive)
+ // - data_info empty && errors non-empty -> total failure
+ // - data_info non-empty && errors empty -> full success
+ // - data_info non-empty && errors non-empty -> partial failure
+ repeated string errors = 5;
}
message LifecycleReport {
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index dff32d742..3ff0ae41c 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -37,14 +37,33 @@ import (
const seriesIndexDirName = "sidx"
func (s *segment[T, O]) IndexDB() IndexDB {
- return s.index
+ // Snapshot s.index into a local: the field can be flipped to nil by
+ // performCleanup at any time, so a second read could see a different
+ // value. Returning the local also avoids boxing a nil *seriesIndex
+ // into a typed-nil IndexDB interface, which would defeat the
+ // `if indexDB == nil` guard on the caller side.
+ idx := s.index
+ if idx == nil {
+ return nil
+ }
+ return idx
}
func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series)
(pbv1.SeriesList, error) {
- sl, err := s.index.filter(ctx, series, nil, nil, nil)
+ idx := s.index
+ if idx == nil {
+ return nil, ErrSegmentClosed
+ }
+ sl, err := idx.filter(ctx, series, nil, nil, nil)
return sl.SeriesList, err
}
+// seriesIndex is the per-segment series index. Every method on this type
+// starts with `if s == nil { ... }` purely to absorb the typed-nil leak
+// that `segment.IndexDB()` could expose if a caller bypassed the producer
+// guard there. The guards intentionally do NOT make the type fully
+// nil-safe -- they assume that on a non-nil receiver, store / l / metrics
+// / p are all populated by newSeriesIndex (the only constructor).
type seriesIndex struct {
store index.SeriesStore
l *logger.Logger
@@ -79,28 +98,45 @@ func newSeriesIndex(ctx context.Context, root string,
flushTimeoutSeconds int64,
}
func (s *seriesIndex) Insert(docs index.Documents) error {
+ if s == nil {
+ return ErrSegmentClosed
+ }
return s.store.InsertSeriesBatch(index.Batch{
Documents: docs,
})
}
func (s *seriesIndex) Update(docs index.Documents) error {
+ if s == nil {
+ return ErrSegmentClosed
+ }
return s.store.UpdateSeriesBatch(index.Batch{
Documents: docs,
})
}
func (s *seriesIndex) EnableExternalSegments() (index.ExternalSegmentStreamer,
error) {
+ if s == nil {
+ return nil, ErrSegmentClosed
+ }
return s.store.EnableExternalSegments()
}
+// Stats degrades to (0, 0) on a nil receiver as defense-in-depth in case
+// a typed-nil interface leaks past segment.IndexDB().
func (s *seriesIndex) Stats() (dataCount int64, dataSizeBytes int64) {
+ if s == nil {
+ return 0, 0
+ }
return s.store.Stats()
}
func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
projection []index.FieldKey, secondaryQuery index.Query, timeRange
*timestamp.TimeRange,
) (data SeriesData, err error) {
+ if s == nil {
+ return SeriesData{}, ErrSegmentClosed
+ }
if len(series) == 0 && secondaryQuery == nil && timeRange == nil {
return data, nil
}
@@ -231,6 +267,9 @@ func convertIndexSeriesToSeriesList(indexSeries
[]index.SeriesDocument, hasField
func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts
IndexSearchOpts,
) (sd SeriesData, sortedValues [][]byte, err error) {
+ if s == nil {
+ return SeriesData{}, nil, ErrSegmentClosed
+ }
tracer := query.GetTracer(ctx)
if tracer != nil {
var span *query.Span
@@ -309,6 +348,9 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, opts In
}
func (s *seriesIndex) SearchWithoutSeries(ctx context.Context, opts
IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error) {
+ if s == nil {
+ return SeriesData{}, nil, ErrSegmentClosed
+ }
tracer := query.GetTracer(ctx)
if tracer != nil {
var span *query.Span
@@ -384,6 +426,9 @@ func (s *seriesIndex) SearchWithoutSeries(ctx
context.Context, opts IndexSearchO
}
func (s *seriesIndex) Close() error {
+ if s == nil {
+ return nil
+ }
s.metrics.DeleteAll(s.p.SegLabelValues()...)
return s.store.Close()
}
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 04a333507..4f1490bec 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -159,13 +159,41 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T,
shardIDs []common.ShardID,
return tt, shardIDs, cc
}
+// incRef acquires one reference on the segment, opening it via initialize
+// if it is currently closed (refCount<=0). On return, the caller is
+// guaranteed that the segment is alive (s.index != nil, shards loaded) and
+// will stay alive until the matching DecRef.
+//
+// The CAS loop closes a TOCTOU race that the older "Load + AddInt32"
+// shape suffered from: a goroutine could read refCount=1, race against a
+// concurrent DecRef that drives it to 0 and triggers performCleanup
+// (which sets s.index=nil and tears down shards), and then AddInt32 it
+// back to 1. The caller would walk away with refCount=1 -- formally a
+// valid reference -- but pointing at an already-cleaned-up segment, so
+// every subsequent IndexDB() / Tables() call would observe zero state.
+//
+// The CAS variant retries whenever a concurrent DecRef beats us to the
+// counter: if CAS fails, we re-Load and either succeed at the new value
+// or fall through to initialize() to reopen the segment under s.mu.
+// Symmetric to the CAS loop in DecRef, which guarantees the dual: a
+// DecRef that drives refCount to 0 sees no concurrent +1 sneak in
+// before performCleanup runs.
func (s *segment[T, O]) incRef(ctx context.Context) error {
s.lastAccessed.Store(time.Now().UnixNano())
- if atomic.LoadInt32(&s.refCount) <= 0 {
- return s.initialize(ctx)
+ for {
+ current := atomic.LoadInt32(&s.refCount)
+ if current <= 0 {
+ // Either the segment was never opened or DecRef just
drove
+ // refCount to 0; reopen under the mutex.
+ return s.initialize(ctx)
+ }
+ // CAS so a concurrent DecRef cannot flip refCount to 0 and run
+ // performCleanup between our Load and our increment. On failure
+ // we re-Load and re-evaluate the branch.
+ if atomic.CompareAndSwapInt32(&s.refCount, current, current+1) {
+ return nil
+ }
}
- atomic.AddInt32(&s.refCount, 1)
- return nil
}
func (s *segment[T, O]) initialize(ctx context.Context) error {
@@ -173,6 +201,12 @@ func (s *segment[T, O]) initialize(ctx context.Context)
error {
defer s.mu.Unlock()
if atomic.LoadInt32(&s.refCount) > 0 {
+ // Another goroutine reopened the segment while we waited for
the
+ // mutex. Add the +1 our caller (incRef) skipped before entering
+ // the slow path; otherwise concurrent reopens would share one
+ // refCount and the first DecRef would cleanup while we are
still
+ // using it.
+ atomic.AddInt32(&s.refCount, 1)
return nil
}
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index 9a55dd358..f50e07603 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -24,6 +24,8 @@ import (
"os"
"path/filepath"
"strconv"
+ "sync"
+ "sync/atomic"
"testing"
"time"
@@ -860,3 +862,319 @@ func TestOpenFallbackOldFormatMetadata(t *testing.T) {
seg.DecRef()
}
}
+
+// TestSegment_IndexDB_ReturnsUntypedNilWhenClosed is a regression guard for
+// the cold-tier nil-pointer panic. After a segment has been idle-closed
+// (s.index == nil), segment.IndexDB() must return an untyped nil interface
+// so that the standard caller pattern `if indexDB == nil { ... }` short
+// circuits correctly. Returning the underlying *seriesIndex would box a
+// typed-nil interface, defeat the nil check, and crash on the next method
+// call.
+func TestSegment_IndexDB_ReturnsUntypedNilWhenClosed(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+
+ ctx := context.Background()
+ l := logger.GetLogger("test-untyped-nil")
+ ctx = context.WithValue(ctx, logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{Database: "test-db", Stage: "test-stage"}
+ })
+
+ opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+ TSTableCreator: func(_ fs.FileSystem, _ string, _
common.Position, _ *logger.Logger,
+ _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+ ) (mockTSTable, error) {
+ return mockTSTable{ID: common.ShardID(0)}, nil
+ },
+ ShardNum: 1,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 7},
+ SeriesIndexFlushTimeoutSeconds: 10,
+ SeriesIndexCacheMaxBytes: 1024 * 1024,
+ }
+
+ serviceCache := NewServiceCache().(*serviceCache)
+ sc := newSegmentController[mockTSTable, mockTSTableOpener](
+ ctx, tempDir, l, opts, nil, nil, time.Second,
+
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"),
opts.MemoryLimit),
+ serviceCache, group,
+ )
+
+ now := time.Now().UTC()
+ startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0,
time.UTC)
+ endTime := startTime.Add(24 * time.Hour)
+
+ segPath := filepath.Join(tempDir,
"segment-"+startTime.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segPath, DirPerm))
+ require.NoError(t, os.WriteFile(
+ filepath.Join(segPath, metadataFilename),
+ []byte(currentVersion), FilePerm))
+
+ seg, err := sc.openSegment(ctx, startTime, endTime, segPath,
+ startTime.Format(dayFormat), sc.groupCache)
+ require.NoError(t, err)
+
+ // Idle-close: drop the only reference so performCleanup runs and the
+ // segment ends up in the residual state seen on cold-tier nodes
+ // (refCount=0, s.index=nil, but segment still reachable for reopen).
+ seg.DecRef()
+ require.Nil(t, seg.index)
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+ indexDB := seg.IndexDB()
+ require.True(t, indexDB == nil,
+ "IndexDB() must return untyped nil after idle close so the
caller's "+
+ "`if indexDB == nil` guard fires")
+
+ // Drive the exact caller pattern from collectSeriesIndexInfo. It must
+ // short-circuit on the nil check and never dispatch Stats() on a nil
+ // receiver.
+ var dataCount, dataSize int64
+ func() {
+ defer func() {
+ if r := recover(); r != nil {
+ t.Fatalf("caller pattern must not panic but
got: %v", r)
+ }
+ }()
+ if indexDB != nil {
+ dataCount, dataSize = indexDB.Stats()
+ }
+ }()
+ assert.Equal(t, int64(0), dataCount)
+ assert.Equal(t, int64(0), dataSize)
+}
+
+// TestSegment_ConcurrentReopen_RefCountConsistent stress-tests the
+// reference-count contract of segment.incRef under concurrent reopen.
+// Before the fix, segment.initialize() returned without AddInt32 when
+// another goroutine had already reopened the segment, so the second caller
+// "owned" a reference it had never accounted for. The first DecRef would
+// then trigger performCleanup while another goroutine was still using the
+// segment.
+//
+// After the fix, every concurrent incRef must add exactly one reference
+// regardless of which path it takes.
+//
+// Reliability note: a single 64-goroutine round can occasionally serialize
+// well enough that only the first caller enters the slow path
+// (initialize) and the rest take the fast path (CAS+1). That serialized
+// shape would also pass on the buggy code. To make the test deterministic
+// across machines, the experiment is repeated for `rounds` cycles, each
+// starting from the residual state. The bug becomes effectively
+// impossible to miss across all rounds combined.
+func TestSegment_ConcurrentReopen_RefCountConsistent(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+
+ ctx := context.Background()
+ l := logger.GetLogger("test-concurrent-reopen")
+ ctx = context.WithValue(ctx, logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{Database: "test-db", Stage: "test-stage"}
+ })
+
+ opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+ TSTableCreator: func(_ fs.FileSystem, _ string, _
common.Position, _ *logger.Logger,
+ _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+ ) (mockTSTable, error) {
+ return mockTSTable{ID: common.ShardID(0)}, nil
+ },
+ ShardNum: 2,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 7},
+ SeriesIndexFlushTimeoutSeconds: 10,
+ SeriesIndexCacheMaxBytes: 1024 * 1024,
+ }
+
+ serviceCache := NewServiceCache().(*serviceCache)
+ sc := newSegmentController[mockTSTable, mockTSTableOpener](
+ ctx, tempDir, l, opts, nil, nil, time.Second,
+
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"),
opts.MemoryLimit),
+ serviceCache, group,
+ )
+
+ now := time.Now().UTC()
+ startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0,
time.UTC)
+ endTime := startTime.Add(24 * time.Hour)
+
+ segPath := filepath.Join(tempDir,
"segment-"+startTime.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segPath, DirPerm))
+ require.NoError(t, os.WriteFile(
+ filepath.Join(segPath, metadataFilename),
+ []byte(currentVersion), FilePerm))
+
+ seg, err := sc.openSegment(ctx, startTime, endTime, segPath,
+ startTime.Format(dayFormat), sc.groupCache)
+ require.NoError(t, err)
+
+ // Bring the segment into the cold-tier residual state.
+ seg.DecRef()
+ require.Nil(t, seg.index)
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+ const (
+ N = 64
+ rounds = 10
+ )
+ for r := 0; r < rounds; r++ {
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount),
+ "round %d: segment must start at refCount=0", r)
+ require.Nil(t, seg.index, "round %d: segment.index must start
nil", r)
+
+ var (
+ wg sync.WaitGroup
+ start = make(chan struct{})
+ errCnt int64
+ )
+ wg.Add(N)
+ for i := 0; i < N; i++ {
+ go func() {
+ defer wg.Done()
+ <-start
+ if incErr := seg.incRef(ctx); incErr != nil {
+ atomic.AddInt64(&errCnt, 1)
+ }
+ }()
+ }
+ close(start)
+ wg.Wait()
+
+ require.Equal(t, int64(0), atomic.LoadInt64(&errCnt),
+ "round %d: incRef must not error", r)
+ require.Equal(t, int32(N), atomic.LoadInt32(&seg.refCount),
+ "round %d: every concurrent incRef must add exactly one
reference", r)
+ require.NotNil(t, seg.index, "round %d: segment.index must be
reopened", r)
+
+ // Drain references; the last DecRef must trigger performCleanup
+ // and bring the segment back to the residual state for the next
+ // round.
+ for i := 0; i < N; i++ {
+ seg.DecRef()
+ }
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount),
+ "round %d", r)
+ require.Nil(t, seg.index,
+ "round %d: segment.index must be cleared by
performCleanup", r)
+ }
+}
+
+// TestSegment_ConcurrentReopenAndClose_NoPanic exercises the cold-tier
+// reopen pattern under heavy concurrency: many goroutines repeatedly call
+// selectSegments, exercise IndexDB().Stats() on each returned segment, and
+// release with DecRef. With idle-closed segments as the starting state,
+// each iteration triggers initialize() to reopen the segment and the final
+// DecRef triggers performCleanup() to close it again, so closes and
+// reopens are interleaved across goroutines.
+//
+// Verifies (run with -race):
+// - no goroutine panics (covers both the typed-nil and refcount fixes)
+// - no data race is detected on the segment lifecycle
+// - all segments end up cleanly closed (refCount == 0, index == nil)
+func TestSegment_ConcurrentReopenAndClose_NoPanic(t *testing.T) {
+ tempDir, cleanup := setupTestEnvironment(t)
+ defer cleanup()
+
+ ctx := context.Background()
+ l := logger.GetLogger("test-concurrent-reopen-close")
+ ctx = context.WithValue(ctx, logger.ContextKey, l)
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return common.Position{Database: "test-db", Stage: "test-stage"}
+ })
+
+ opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+ TSTableCreator: func(_ fs.FileSystem, _ string, _
common.Position, _ *logger.Logger,
+ _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+ ) (mockTSTable, error) {
+ return mockTSTable{ID: common.ShardID(0)}, nil
+ },
+ ShardNum: 2,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 7},
+ SeriesIndexFlushTimeoutSeconds: 10,
+ SeriesIndexCacheMaxBytes: 1024 * 1024,
+ }
+
+ serviceCache := NewServiceCache().(*serviceCache)
+ sc := newSegmentController[mockTSTable, mockTSTableOpener](
+ ctx, tempDir, l, opts, nil, nil, time.Second,
+
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"),
opts.MemoryLimit),
+ serviceCache, group,
+ )
+
+ now := time.Now().UTC()
+ day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0,
time.UTC)
+ day2 := day1.Add(24 * time.Hour)
+ day3 := day2.Add(24 * time.Hour)
+ days := []time.Time{day1, day2, day3}
+
+ for _, d := range days {
+ segPath := filepath.Join(tempDir,
"segment-"+d.Format(dayFormat))
+ require.NoError(t, os.MkdirAll(segPath, DirPerm))
+ require.NoError(t, os.WriteFile(
+ filepath.Join(segPath, metadataFilename),
+ []byte(currentVersion), FilePerm))
+ seg, openErr := sc.openSegment(ctx, d, d.Add(24*time.Hour),
+ segPath, d.Format(dayFormat), sc.groupCache)
+ require.NoError(t, openErr)
+ sc.Lock()
+ sc.lst = append(sc.lst, seg)
+ sc.sortLst()
+ sc.Unlock()
+ // Drop the open reference so each segment starts in the
residual state.
+ seg.DecRef()
+ require.Nil(t, seg.index)
+ }
+
+ timeRange := timestamp.NewInclusiveTimeRange(day1,
day3.Add(24*time.Hour))
+
+ const (
+ goroutines = 8
+ cycles = 200
+ )
+ var (
+ wg sync.WaitGroup
+ start = make(chan struct{})
+ panicCnt int64
+ )
+ for i := 0; i < goroutines; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ atomic.AddInt64(&panicCnt, 1)
+ t.Errorf("goroutine-%d panicked: %v",
id, r)
+ }
+ }()
+ <-start
+ for c := 0; c < cycles; c++ {
+ segs, selErr := sc.selectSegments(timeRange)
+ if selErr != nil {
+ t.Errorf("goroutine-%d cycle %d
selectSegments: %v", id, c, selErr)
+ return
+ }
+ for _, s := range segs {
+ if idx := s.IndexDB(); idx != nil {
+ _, _ = idx.Stats()
+ }
+ s.DecRef()
+ }
+ }
+ }(i)
+ }
+ close(start)
+ wg.Wait()
+
+ require.Equal(t, int64(0), atomic.LoadInt64(&panicCnt),
+ "no goroutine should panic during the concurrent reopen/close
cycle")
+
+ sc.RLock()
+ final := append([]*segment[mockTSTable, mockTSTableOpener]{}, sc.lst...)
+ sc.RUnlock()
+ for _, s := range final {
+ require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount),
+ "segment %s leaked references", s.suffix)
+ require.Nil(t, s.index, "segment %s should be cleaned up",
s.suffix)
+ }
+}
diff --git a/banyand/liaison/grpc/deletion.go b/banyand/liaison/grpc/deletion.go
index 4670b8ca2..9cbbfbc6d 100644
--- a/banyand/liaison/grpc/deletion.go
+++ b/banyand/liaison/grpc/deletion.go
@@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
+ "strings"
"sync"
"time"
@@ -158,11 +159,16 @@ func (m *groupDeletionTaskManager) startDeletion(ctx
context.Context, group stri
return fmt.Errorf("deletion task for group %s is already in
progress", group)
}
- dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+ dataInfo, collectionErrs, dataErr :=
m.schemaRegistry.CollectDataInfo(ctx, group)
if dataErr != nil {
m.tasks.Delete(group)
return fmt.Errorf("failed to collect data info for group %s:
%w", group, dataErr)
}
+ if len(collectionErrs) > 0 {
+ m.tasks.Delete(group)
+ return fmt.Errorf("incomplete data info for group %s, refusing
to start deletion: %s",
+ group, strings.Join(collectionErrs, "; "))
+ }
var totalDataSize int64
for _, di := range dataInfo {
totalDataSize += di.GetDataSizeBytes()
@@ -452,10 +458,14 @@ func (m *groupDeletionTaskManager) getDeletionTask(ctx
context.Context, group st
}
func (m *groupDeletionTaskManager) hasNonEmptyResources(ctx context.Context,
group string) (bool, error) {
- dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+ dataInfo, collectionErrs, dataErr :=
m.schemaRegistry.CollectDataInfo(ctx, group)
if dataErr != nil {
return false, fmt.Errorf("failed to collect data info: %w",
dataErr)
}
+ if len(collectionErrs) > 0 {
+ return false, fmt.Errorf("incomplete data info for group %s,
cannot determine emptiness: %s",
+ group, strings.Join(collectionErrs, "; "))
+ }
for _, di := range dataInfo {
if di.GetDataSizeBytes() > 0 {
return true, nil
diff --git a/banyand/liaison/grpc/deletion_test.go
b/banyand/liaison/grpc/deletion_test.go
index 6cf47e957..812b5f460 100644
--- a/banyand/liaison/grpc/deletion_test.go
+++ b/banyand/liaison/grpc/deletion_test.go
@@ -121,7 +121,7 @@ func TestHasNonEmptyResources(t *testing.T) {
defer ctrl.Finish()
mockRepo := metadata.NewMockRepo(ctrl)
- mockRepo.EXPECT().CollectDataInfo(gomock.Any(),
"test-group").Return(tt.infos, nil)
+ mockRepo.EXPECT().CollectDataInfo(gomock.Any(),
"test-group").Return(tt.infos, nil, nil)
m := &groupDeletionTaskManager{schemaRegistry: mockRepo}
hasResources, checkErr :=
m.hasNonEmptyResources(context.Background(), "test-group")
@@ -129,6 +129,30 @@ func TestHasNonEmptyResources(t *testing.T) {
assert.Equal(t, tt.expected, hasResources)
})
}
+
+ // Partial-failure path: when CollectDataInfo reports per-node errors,
+ // hasNonEmptyResources must refuse to answer (returning an error)
+ // rather than silently treating the half-populated DataInfo as
+ // authoritative. A wrong "false" here would let the caller delete
+ // data on the silent node.
+ t.Run("partial failure refuses to answer", func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ mockRepo := metadata.NewMockRepo(ctrl)
+ mockRepo.EXPECT().CollectDataInfo(gomock.Any(),
"test-group").Return(
+ []*databasev1.DataInfo{{DataSizeBytes: 0}}, // looks
empty
+ []string{"node error: cold-0 panic"}, // but
cold-0 did not report
+ nil,
+ )
+
+ m := &groupDeletionTaskManager{schemaRegistry: mockRepo}
+ hasResources, checkErr :=
m.hasNonEmptyResources(context.Background(), "test-group")
+ require.Error(t, checkErr, "partial collection failure must
surface as an error, not (false, nil)")
+ assert.Contains(t, checkErr.Error(), "incomplete data info")
+ assert.Contains(t, checkErr.Error(), "cold-0 panic")
+ assert.False(t, hasResources, "the bool return is irrelevant
when the error is non-nil; caller must check err first")
+ })
}
func TestDeletion(t *testing.T) {
@@ -192,7 +216,7 @@ func TestDeletion(t *testing.T) {
}
mockRepo := metadata.NewMockRepo(ctrl)
- mockRepo.EXPECT().CollectDataInfo(gomock.Any(),
group).Return([]*databasev1.DataInfo{{DataSizeBytes: 512}}, nil)
+ mockRepo.EXPECT().CollectDataInfo(gomock.Any(),
group).Return([]*databasev1.DataInfo{{DataSizeBytes: 512}}, nil, nil)
mockRepo.EXPECT().IndexRuleBindingRegistry().Return(&stubIndexRuleBinding{})
mockRepo.EXPECT().IndexRuleRegistry().Return(&stubIndexRule{})
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 58f365213..3b06e2d3d 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -20,6 +20,8 @@ package grpc
import (
"context"
"errors"
+ "fmt"
+ "strings"
"time"
"google.golang.org/grpc/codes"
@@ -768,11 +770,16 @@ func (rs *groupRegistryServer) Inspect(ctx
context.Context, req *databasev1.Grou
rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
return nil, schemaErr
}
- dataInfo, dataErr := rs.schemaRegistry.CollectDataInfo(ctx, g)
+ dataInfo, collectionErrs, dataErr :=
rs.schemaRegistry.CollectDataInfo(ctx, g)
if dataErr != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
return nil, dataErr
}
+ if len(collectionErrs) > 0 {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+ return nil, fmt.Errorf("incomplete data info for group %s: %s",
+ g, strings.Join(collectionErrs, "; "))
+ }
liaisonInfo, liaisonErr := rs.schemaRegistry.CollectLiaisonInfo(ctx, g)
if liaisonErr != nil {
rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
diff --git a/banyand/measure/metadata_internal_test.go
b/banyand/measure/metadata_internal_test.go
new file mode 100644
index 000000000..87fa5db3a
--- /dev/null
+++ b/banyand/measure/metadata_internal_test.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+)
+
+// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
+// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
+// signature without implementing every method (only IndexDB is exercised
+// by collectSeriesIndexInfo).
+type nilIndexDBSegment struct {
+ storage.Segment[*tsTable, option]
+}
+
+func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+
+// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
+// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
+// when the segment is in the cold-tier residual state where its underlying
+// series index has been torn down by performCleanup.
+func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+ sr := &schemaRepo{}
+ info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+ require.NotNil(t, info)
+ require.Equal(t, int64(0), info.DataCount)
+ require.Equal(t, int64(0), info.DataSizeBytes)
+}
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index fcd72a79a..fd11c9e46 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -502,7 +502,7 @@ func (s *clientService) Subjects(ctx context.Context,
indexRule *databasev1.Inde
return foundSubjects, subjectErr
}
-func (s *clientService) CollectDataInfo(ctx context.Context, group string)
([]*databasev1.DataInfo, error) {
+func (s *clientService) CollectDataInfo(ctx context.Context, group string)
([]*databasev1.DataInfo, []string, error) {
return s.infoCollectorRegistry.CollectDataInfo(ctx, group)
}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 33865e820..59ef2332e 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -52,7 +52,7 @@ type Repo interface {
RegisterHandler(string, schema.Kind, schema.EventHandler)
NodeRegistry() schema.Node
PropertyRegistry() schema.Property
- CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, error)
+ CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo,
[]string, error)
CollectLiaisonInfo(context.Context, string) ([]*databasev1.LiaisonInfo,
error)
DropGroup(ctx context.Context, catalog commonv1.Catalog, group string)
error
}
diff --git a/banyand/metadata/schema/collector.go
b/banyand/metadata/schema/collector.go
index 9b297fd66..6ece113de 100644
--- a/banyand/metadata/schema/collector.go
+++ b/banyand/metadata/schema/collector.go
@@ -39,6 +39,20 @@ import (
// merge/flush load can take several seconds) while still bounding the call.
const inspectBroadcastTimeout = 30 * time.Second
+// Prefixes used by every producer that writes into
+// GroupLifecycleInfo.errors so downstream consumers can categorize
+// failures via a stable prefix match. The full vocabulary lives here:
+// BroadcastErrorPrefix / FutureErrorPrefix / NodeErrorPrefix are written
+// by broadcastCollectDataInfo below; TopLevelErrorPrefix is written by
+// the InspectAll RPC handler in banyand/queue/sub/group_lifecycle.go,
+// which observes the top-level error returned by CollectDataInfo.
+const (
+ BroadcastErrorPrefix = "broadcast failed: "
+ FutureErrorPrefix = "future error: "
+ NodeErrorPrefix = "node error: "
+ TopLevelErrorPrefix = "top-level: "
+)
+
// GroupGetter provides method to get group metadata.
type GroupGetter interface {
GetGroup(ctx context.Context, group string) (*commonv1.Group, error)
@@ -72,22 +86,29 @@ func NewInfoCollectorRegistry(l *logger.Logger, groupGetter
GroupGetter) *InfoCo
}
}
-// CollectDataInfo collects data information from both local and remote data
nodes.
-func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group
string) ([]*databasev1.DataInfo, error) {
+// CollectDataInfo collects data information from both local and remote data
+// nodes. Returns the aggregated DataInfo slice plus a separate slice of
+// per-node collection errors observed during broadcast (broadcast failure,
+// future errors, listener-side common.Error responses); the two slices
+// are independent -- the errors slice only contains failures and is not
+// positionally aligned with DataInfo. The error return is reserved for
+// top-level failures that prevent any aggregation (GetGroup failure,
+// local collector failure, unsupported catalog).
+func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group
string) ([]*databasev1.DataInfo, []string, error) {
g, getErr := icr.groupGetter.GetGroup(ctx, group)
if getErr != nil {
- return nil, getErr
+ return nil, nil, getErr
}
localInfo, localErr := icr.collectDataInfoLocal(ctx, g.Catalog, group)
if localErr != nil {
- return nil, localErr
+ return nil, nil, localErr
}
localInfoList := []*databasev1.DataInfo{}
if localInfo != nil {
localInfoList = []*databasev1.DataInfo{localInfo}
}
if icr.dataBroadcaster == nil {
- return localInfoList, nil
+ return localInfoList, nil, nil
}
var topic bus.Topic
@@ -99,25 +120,27 @@ func (icr *InfoCollectorRegistry) CollectDataInfo(ctx
context.Context, group str
case commonv1.Catalog_CATALOG_TRACE:
topic = data.TopicTraceCollectDataInfo
default:
- return nil, fmt.Errorf("unsupported catalog type: %v",
g.Catalog)
+ return nil, nil, fmt.Errorf("unsupported catalog type: %v",
g.Catalog)
}
- remoteInfo := icr.broadcastCollectDataInfo(topic, group)
- return append(localInfoList, remoteInfo...), nil
+ remoteInfo, collectionErrors := icr.broadcastCollectDataInfo(topic,
group)
+ return append(localInfoList, remoteInfo...), collectionErrors, nil
}
-func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic,
group string) []*databasev1.DataInfo {
+func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic,
group string) ([]*databasev1.DataInfo, []string) {
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
&databasev1.GroupRegistryServiceInspectRequest{Group: group})
futures, broadcastErr :=
icr.dataBroadcaster.Broadcast(inspectBroadcastTimeout, topic, message)
if broadcastErr != nil {
icr.l.Warn().Err(broadcastErr).Str("group", group).Msg("failed
to broadcast collect data info request")
- return []*databasev1.DataInfo{}
+ return []*databasev1.DataInfo{}, []string{BroadcastErrorPrefix
+ broadcastErr.Error()}
}
dataInfoList := make([]*databasev1.DataInfo, 0, len(futures))
+ var collectionErrors []string
for _, future := range futures {
msg, getErr := future.Get()
if getErr != nil {
icr.l.Warn().Err(getErr).Str("group",
group).Msg("failed to get collect data info response")
+ collectionErrors = append(collectionErrors,
FutureErrorPrefix+getErr.Error())
continue
}
msgData := msg.Data()
@@ -128,9 +151,10 @@ func (icr *InfoCollectorRegistry)
broadcastCollectDataInfo(topic bus.Topic, grou
}
case *common.Error:
icr.l.Warn().Str("error", d.Error()).Str("group",
group).Msg("error collecting data info from node")
+ collectionErrors = append(collectionErrors,
NodeErrorPrefix+d.Error())
}
}
- return dataInfoList
+ return dataInfoList, collectionErrors
}
func (icr *InfoCollectorRegistry) collectDataInfoLocal(ctx context.Context,
catalog commonv1.Catalog, group string) (*databasev1.DataInfo, error) {
diff --git a/banyand/queue/sub/group_lifecycle.go
b/banyand/queue/sub/group_lifecycle.go
index 8104015da..d8d1f495b 100644
--- a/banyand/queue/sub/group_lifecycle.go
+++ b/banyand/queue/sub/group_lifecycle.go
@@ -24,6 +24,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
fodcv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
)
// maxInspectGroupConcurrency caps how many groups InspectAll will inspect
@@ -77,12 +78,19 @@ func (s *server) inspectGroup(ctx context.Context, group
*commonv1.Group) *fodcv
Catalog: catalogToString(group.Catalog),
ResourceOpts: group.ResourceOpts,
}
- dataInfo, err := s.metadataRepo.CollectDataInfo(ctx, groupName)
+ // CollectDataInfo's contract: a non-nil err means no DataInfo and nil
+ // collectionErrs (per banyand/metadata/schema/collector.go); a nil err
+ // means DataInfo is populated and collectionErrs may carry per-node
+ // broadcast failures. The two paths never overlap, so the top-level
+ // branch sets a fresh single-element Errors slice.
+ dataInfo, collectionErrs, err := s.metadataRepo.CollectDataInfo(ctx,
groupName)
if err != nil {
s.log.Warn().Err(err).Str("group", groupName).Msg("Failed to
collect data info")
- } else {
- info.DataInfo = dataInfo
+ info.Errors = []string{schema.TopLevelErrorPrefix + err.Error()}
+ return info
}
+ info.DataInfo = dataInfo
+ info.Errors = collectionErrs
return info
}
diff --git a/banyand/queue/sub/group_lifecycle_cold_tier_test.go
b/banyand/queue/sub/group_lifecycle_cold_tier_test.go
new file mode 100644
index 000000000..46beacd4c
--- /dev/null
+++ b/banyand/queue/sub/group_lifecycle_cold_tier_test.go
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ fodcv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// coldTierRepo is a metadata.Repo stub modeled on cold-tier data nodes.
+// CollectDataInfo simulates the post-fix output of
schemaRepo.collectSeriesIndexInfo
+// for idle-closed segments: returns DataInfo whose SegmentInfo entries each
carry
+// an empty SeriesIndexInfo (DataCount=0, DataSizeBytes=0). The hook lets tests
+// inject failures, latency, or per-call mutation to mimic the cold-tier
+// "InspectAll while closeIdleSegments is firing" race window.
+type coldTierRepo struct {
+ metadata.Repo
+ groupRegistry *mockGroupRegistry
+ hook func(group string, callIdx int) ([]*databasev1.DataInfo,
[]string, error)
+ callsByGroup sync.Map // map[string]*atomic.Int64
+ totalCalls atomic.Int64
+ concurrentNow atomic.Int32
+ concurrentMax atomic.Int32
+ panicCount atomic.Int32
+}
+
+func (r *coldTierRepo) GroupRegistry() schema.Group {
+ return r.groupRegistry
+}
+
+func (r *coldTierRepo) CollectDataInfo(_ context.Context, group string) (out
[]*databasev1.DataInfo, collectionErrs []string, err error) {
+ defer func() {
+ if rec := recover(); rec != nil {
+ r.panicCount.Add(1)
+ err = fmt.Errorf("panic in CollectDataInfo: %v", rec)
+ }
+ }()
+ r.totalCalls.Add(1)
+ current := r.concurrentNow.Add(1)
+ defer r.concurrentNow.Add(-1)
+ for {
+ hi := r.concurrentMax.Load()
+ if current <= hi || r.concurrentMax.CompareAndSwap(hi, current)
{
+ break
+ }
+ }
+ cntAny, _ := r.callsByGroup.LoadOrStore(group, new(atomic.Int64))
+ idx := cntAny.(*atomic.Int64).Add(1)
+ if r.hook != nil {
+ return r.hook(group, int(idx-1))
+ }
+ return coldTierEmptyDataInfo(), nil, nil
+}
+
+// coldTierEmptyDataInfo mirrors the DataInfo shape the fixed schemaRepo
+// returns when every selected segment is in the idle-closed residual state
+// (s.index = nil): non-nil top-level DataInfo with one SegmentInfo entry whose
+// SeriesIndexInfo is the zero value.
+func coldTierEmptyDataInfo() []*databasev1.DataInfo {
+ return []*databasev1.DataInfo{
+ {
+ SegmentInfo: []*databasev1.SegmentInfo{
+ {
+ SegmentId: "seg-cold-tier-idle",
+ SeriesIndexInfo:
&databasev1.SeriesIndexInfo{},
+ ShardInfo:
[]*databasev1.ShardInfo{},
+ },
+ },
+ DataSizeBytes: 0,
+ },
+ }
+}
+
+func newColdTierGroups(catalogs ...commonv1.Catalog) []*commonv1.Group {
+ groups := make([]*commonv1.Group, 0, len(catalogs))
+ for i, cat := range catalogs {
+ groups = append(groups, &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name:
fmt.Sprintf("cold_group_%d", i)},
+ Catalog: cat,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
Num: 30,
+ },
+ },
+ })
+ }
+ return groups
+}
+
+// TestInspectAll_ColdTier_Integration simulates the cold-tier traffic
+// pattern that triggered the production nil-pointer panic on
+// demo-banyandb-data-cold-0. FODC InspectAll fan-out plus
+// closeIdleSegments leave segments in the "refCount=0, index=nil"
+// residual state. With the typed-nil and refcount fixes applied,
+// schemaRepo.collectSeriesIndexInfo short-circuits and returns an empty
+// SeriesIndexInfo rather than panicking.
+//
+// This test wires the real InspectAll RPC handler against a stub
+// metadata.Repo that mirrors the post-fix output of the storage path,
+// then drives four cold-tier scenarios end-to-end:
+//
+// A. single InspectAll after idle-close
+// B. repeated InspectAll cycles (cold-tier reinspect)
+// C. concurrent InspectAll calls
+// D. concurrent InspectAll with simulated idle-close churn between calls
+func TestInspectAll_ColdTier_Integration(t *testing.T) {
+ groups := newColdTierGroups(
+ commonv1.Catalog_CATALOG_MEASURE,
+ commonv1.Catalog_CATALOG_STREAM,
+ commonv1.Catalog_CATALOG_TRACE,
+ )
+
+ t.Run("A_SingleInspectAfterIdleClose", func(t *testing.T) {
+ repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups:
groups}}
+ s := &server{log: logger.GetLogger("cold-tier-A"),
metadataRepo: repo}
+
+ resp, err := s.InspectAll(context.Background(),
&fodcv1.InspectAllRequest{})
+ require.NoError(t, err)
+ require.NotNil(t, resp)
+ require.Len(t, resp.Groups, len(groups))
+
+ for _, g := range resp.Groups {
+ require.NotEmpty(t, g.DataInfo, "group %s should expose
post-fix DataInfo", g.Name)
+ for _, di := range g.DataInfo {
+ require.NotNil(t, di)
+ for _, seg := range di.SegmentInfo {
+ require.NotNil(t, seg.SeriesIndexInfo,
+ "every segment must carry a
non-nil (possibly empty) SeriesIndexInfo")
+ assert.Equal(t, int64(0),
seg.SeriesIndexInfo.DataCount)
+ assert.Equal(t, int64(0),
seg.SeriesIndexInfo.DataSizeBytes)
+ }
+ }
+ }
+ assert.Equal(t, int32(0), repo.panicCount.Load(), "no goroutine
should panic")
+ })
+
+ t.Run("B_RepeatedInspectAll_ColdTierReinspect", func(t *testing.T) {
+ repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups:
groups}}
+ s := &server{log: logger.GetLogger("cold-tier-B"),
metadataRepo: repo}
+
+ const cycles = 25
+ for i := 0; i < cycles; i++ {
+ resp, err := s.InspectAll(context.Background(),
&fodcv1.InspectAllRequest{})
+ require.NoError(t, err, "cycle %d", i)
+ require.Len(t, resp.Groups, len(groups), "cycle %d
returned wrong group count", i)
+ }
+
+ // Every group must have been visited exactly `cycles` times --
no state
+ // leaks across repeated InspectAll calls (this is the "再次获取"
check).
+ for _, g := range groups {
+ cntAny, ok := repo.callsByGroup.Load(g.Metadata.Name)
+ require.True(t, ok, "group %s missing call counter",
g.Metadata.Name)
+ assert.Equal(t, int64(cycles),
cntAny.(*atomic.Int64).Load(),
+ "group %s expected exactly %d calls",
g.Metadata.Name, cycles)
+ }
+ assert.Equal(t, int64(cycles*len(groups)),
repo.totalCalls.Load())
+ assert.Equal(t, int32(0), repo.panicCount.Load())
+ })
+
+ t.Run("C_ConcurrentInspectAll", func(t *testing.T) {
+ repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups:
groups}}
+ s := &server{log: logger.GetLogger("cold-tier-C"),
metadataRepo: repo}
+
+ const callers = 8
+ var (
+ wg sync.WaitGroup
+ start = make(chan struct{})
+ panicCnt int64
+ )
+ for i := 0; i < callers; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ atomic.AddInt64(&panicCnt, 1)
+ t.Errorf("caller-%d panicked:
%v", id, r)
+ }
+ }()
+ <-start
+ resp, err := s.InspectAll(context.Background(),
&fodcv1.InspectAllRequest{})
+ if !assert.NoError(t, err, "caller-%d", id) {
+ return
+ }
+ if !assert.NotNil(t, resp, "caller-%d", id) {
+ return
+ }
+ assert.Len(t, resp.Groups, len(groups),
"caller-%d", id)
+ }(i)
+ }
+ close(start)
+ wg.Wait()
+
+ assert.Equal(t, int64(0), panicCnt, "no caller may panic")
+ assert.Equal(t, int32(0), repo.panicCount.Load())
+ assert.Equal(t, int64(callers*len(groups)),
repo.totalCalls.Load())
+ // Per-group fan-out cap is 32, so at least two groups should
have
+ // overlapped within at least one InspectAll invocation.
+ assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2),
+ "concurrent InspectAll should overlap at least two
CollectDataInfo calls")
+ })
+
+ t.Run("D_ConcurrentInspectAll_WithIdleCloseChurn", func(t *testing.T) {
+ // hook simulates the cold-tier race window: every other call
returns
+ // a "fresh idle close just happened" payload (still empty),
and a few
+ // calls return a benign error to mimic transient disk pressure.
+ brittleHook := func(group string, callIdx int)
([]*databasev1.DataInfo, []string, error) {
+ if callIdx%17 == 0 && callIdx > 0 {
+ return nil, nil, fmt.Errorf("simulated
transient close error on %s call#%d", group, callIdx)
+ }
+ // Yield to amplify scheduling jitter, then return
empty info.
+ time.Sleep(50 * time.Microsecond)
+ return coldTierEmptyDataInfo(), nil, nil
+ }
+ repo := &coldTierRepo{
+ groupRegistry: &mockGroupRegistry{groups: groups},
+ hook: brittleHook,
+ }
+ s := &server{log: logger.GetLogger("cold-tier-D"),
metadataRepo: repo}
+
+ const callers = 6
+ const cycles = 20
+ var (
+ wg sync.WaitGroup
+ start = make(chan struct{})
+ panicCnt int64
+ )
+ for i := 0; i < callers; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+ defer func() {
+ if r := recover(); r != nil {
+ atomic.AddInt64(&panicCnt, 1)
+ t.Errorf("caller-%d panicked:
%v", id, r)
+ }
+ }()
+ <-start
+ for c := 0; c < cycles; c++ {
+ resp, err :=
s.InspectAll(context.Background(), &fodcv1.InspectAllRequest{})
+ // require.* would FailNow inside this
goroutine, which
+ // is forbidden by the testing package;
use assert and
+ // bail out early on the first failure.
+ if !assert.NoError(t, err, "caller-%d
cycle-%d", id, c) {
+ return
+ }
+ if !assert.NotNil(t, resp, "caller-%d
cycle-%d", id, c) {
+ return
+ }
+ if !assert.Len(t, resp.Groups,
len(groups), "caller-%d cycle-%d", id, c) {
+ return
+ }
+ }
+ }(i)
+ }
+ close(start)
+ wg.Wait()
+
+ require.Equal(t, int64(0), panicCnt, "no caller may panic")
+ require.Equal(t, int32(0), repo.panicCount.Load(),
+ "CollectDataInfo must not panic under the cold-tier
churn")
+ assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2),
+ "concurrent InspectAll under churn should overlap at
least two CollectDataInfo calls")
+ })
+}
diff --git a/banyand/queue/sub/group_lifecycle_test.go
b/banyand/queue/sub/group_lifecycle_test.go
index f3a5cd6ff..4f912b323 100644
--- a/banyand/queue/sub/group_lifecycle_test.go
+++ b/banyand/queue/sub/group_lifecycle_test.go
@@ -47,20 +47,21 @@ func (m *mockGroupRegistry) ListGroup(_ context.Context)
([]*commonv1.Group, err
type mockMetadataRepo struct {
metadata.Repo
- groupRegistry *mockGroupRegistry
- dataInfoMap map[string][]*databasev1.DataInfo
- dataInfoErr map[string]error
- slowGroups map[string]time.Duration
- collectStarts atomic.Int32
- concurrentMax atomic.Int32
- concurrentNow atomic.Int32
+ groupRegistry *mockGroupRegistry
+ dataInfoMap map[string][]*databasev1.DataInfo
+ dataInfoErr map[string]error
+ collectionErrors map[string][]string
+ slowGroups map[string]time.Duration
+ collectStarts atomic.Int32
+ concurrentMax atomic.Int32
+ concurrentNow atomic.Int32
}
func (m *mockMetadataRepo) GroupRegistry() schema.Group {
return m.groupRegistry
}
-func (m *mockMetadataRepo) CollectDataInfo(_ context.Context, group string)
([]*databasev1.DataInfo, error) {
+func (m *mockMetadataRepo) CollectDataInfo(_ context.Context, group string)
([]*databasev1.DataInfo, []string, error) {
m.collectStarts.Add(1)
current := m.concurrentNow.Add(1)
defer m.concurrentNow.Add(-1)
@@ -77,13 +78,18 @@ func (m *mockMetadataRepo) CollectDataInfo(_
context.Context, group string) ([]*
}
if m.dataInfoErr != nil {
if err, ok := m.dataInfoErr[group]; ok {
- return nil, err
+ return nil, nil, err
}
}
+ var dataInfo []*databasev1.DataInfo
if m.dataInfoMap != nil {
- return m.dataInfoMap[group], nil
+ dataInfo = m.dataInfoMap[group]
}
- return nil, nil
+ var collectionErrs []string
+ if m.collectionErrors != nil {
+ collectionErrs = m.collectionErrors[group]
+ }
+ return dataInfo, collectionErrs, nil
}
func TestCatalogToString(t *testing.T) {
@@ -264,4 +270,92 @@ func TestInspectAll_RunsGroupsInParallel(t *testing.T) {
assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2),
"at least two CollectDataInfo invocations must overlap;
observed max concurrency was %d",
repo.concurrentMax.Load())
+ for _, g := range resp.Groups {
+ assert.Empty(t, g.Errors, "successful group %s must not carry
errors", g.Name)
+ }
+}
+
+// TestInspectAll_SurfacesTopLevelError asserts that a top-level
+// CollectDataInfo error is reported through GroupLifecycleInfo.errors with
+// a "top-level:" prefix and that DataInfo stays empty for that group.
+func TestInspectAll_SurfacesTopLevelError(t *testing.T) {
+ repo := &mockMetadataRepo{
+ groupRegistry: &mockGroupRegistry{
+ groups: []*commonv1.Group{
+ {Metadata: &commonv1.Metadata{Name:
"ok_group"}, Catalog: commonv1.Catalog_CATALOG_MEASURE},
+ {Metadata: &commonv1.Metadata{Name:
"bad_group"}, Catalog: commonv1.Catalog_CATALOG_STREAM},
+ },
+ },
+ dataInfoMap: map[string][]*databasev1.DataInfo{
+ "ok_group": {{DataSizeBytes: 1024}},
+ },
+ dataInfoErr: map[string]error{
+ "bad_group": fmt.Errorf("metadataRepo: simulated load
failure"),
+ },
+ }
+ s := &server{log: logger.GetLogger("test"), metadataRepo: repo}
+
+ resp, err := s.InspectAll(context.Background(),
&fodcv1.InspectAllRequest{})
+ require.NoError(t, err)
+ require.Len(t, resp.Groups, 2)
+
+ byName := map[string]*fodcv1.GroupLifecycleInfo{}
+ for _, g := range resp.Groups {
+ byName[g.Name] = g
+ }
+
+ ok := byName["ok_group"]
+ require.NotNil(t, ok)
+ assert.Empty(t, ok.Errors, "ok_group must not carry errors")
+ assert.NotEmpty(t, ok.DataInfo)
+
+ bad := byName["bad_group"]
+ require.NotNil(t, bad)
+ assert.Empty(t, bad.DataInfo, "failed group must not expose stale
DataInfo")
+ require.NotEmpty(t, bad.Errors, "failed group must surface the
CollectDataInfo error")
+ assert.Contains(t, bad.Errors[0], "top-level:",
+ "top-level failures must be tagged with the top-level prefix")
+ assert.Contains(t, bad.Errors[0], "simulated load failure",
+ "the original error message must be preserved")
+}
+
+// TestInspectAll_PropagatesPartialFailureErrors asserts that per-node
+// failures reported by mockMetadataRepo.collectionErrors are passed
+// through to GroupLifecycleInfo.errors for the originating group, while
+// DataInfo still carries the entries from nodes that succeeded.
+func TestInspectAll_PropagatesPartialFailureErrors(t *testing.T) {
+ repo := &mockMetadataRepo{
+ groupRegistry: &mockGroupRegistry{
+ groups: []*commonv1.Group{
+ {Metadata: &commonv1.Metadata{Name:
"partial_group"}, Catalog: commonv1.Catalog_CATALOG_MEASURE},
+ },
+ },
+ dataInfoMap: map[string][]*databasev1.DataInfo{
+ "partial_group": {{DataSizeBytes: 512}},
+ },
+ collectionErrors: map[string][]string{
+ "partial_group": {
+ "future error: rpc error: nil pointer
dereference",
+ "node error: cold-0 panic",
+ },
+ },
+ }
+ s := &server{log: logger.GetLogger("test"), metadataRepo: repo}
+
+ resp, err := s.InspectAll(context.Background(),
&fodcv1.InspectAllRequest{})
+ require.NoError(t, err)
+ require.Len(t, resp.Groups, 1)
+ g := resp.Groups[0]
+ assert.NotEmpty(t, g.DataInfo, "partial success must still expose the
DataInfo entries it did get")
+ assert.Equal(t,
+ []string{
+ "future error: rpc error: nil pointer dereference",
+ "node error: cold-0 panic",
+ },
+ g.Errors,
+ "per-node collection errors must be passed through to
GroupLifecycleInfo.errors")
+ for _, e := range g.Errors {
+ assert.NotContains(t, e, "top-level:",
+ "partial-success errors must not carry the top-level:
prefix")
+ }
}
diff --git a/banyand/stream/metadata_internal_test.go
b/banyand/stream/metadata_internal_test.go
new file mode 100644
index 000000000..a292b6704
--- /dev/null
+++ b/banyand/stream/metadata_internal_test.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+)
+
+// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
+// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
+// signature without implementing every method (only IndexDB is exercised
+// by collectSeriesIndexInfo).
+type nilIndexDBSegment struct {
+ storage.Segment[*tsTable, option]
+}
+
+func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+
+// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
+// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
+// when the segment is in the cold-tier residual state where its underlying
+// series index has been torn down by performCleanup.
+func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+ sr := &schemaRepo{}
+ info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+ require.NotNil(t, info)
+ require.Equal(t, int64(0), info.DataCount)
+ require.Equal(t, int64(0), info.DataSizeBytes)
+}
diff --git a/banyand/trace/metadata_internal_test.go
b/banyand/trace/metadata_internal_test.go
new file mode 100644
index 000000000..378078907
--- /dev/null
+++ b/banyand/trace/metadata_internal_test.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+)
+
+// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
+// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
+// signature without implementing every method (only IndexDB is exercised
+// by collectSeriesIndexInfo).
+type nilIndexDBSegment struct {
+ storage.Segment[*tsTable, option]
+}
+
+func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+
+// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
+// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
+// when the segment is in the cold-tier residual state where its underlying
+// series index has been torn down by performCleanup.
+func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+ sr := &schemaRepo{}
+ info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+ require.NotNil(t, info)
+ require.Equal(t, int64(0), info.DataCount)
+ require.Equal(t, int64(0), info.DataSizeBytes)
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 631bd679c..04885839a 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -5282,6 +5282,7 @@ Phase represents the current phase of the deletion task.
| catalog | [string](#string) | | |
| resource_opts |
[banyandb.common.v1.ResourceOpts](#banyandb-common-v1-ResourceOpts) | | |
| data_info | [banyandb.database.v1.DataInfo](#banyandb-database-v1-DataInfo)
| repeated | |
+| errors | [string](#string) | repeated | errors lists every failure observed
while collecting data_info for this group: top-level CollectDataInfo failures
(GetGroup, missing collector, dial failure -- prefixed "top-level: ")
and per-node broadcast failures (prefixed "future error: ", "node
error: ", "broadcast failed: "). Combined with len(data_info),
consumers can tell the following four states apart: - data_info empty
&& errors empty -> no no [...]
diff --git a/fodc/proxy/internal/lifecycle/manager.go
b/fodc/proxy/internal/lifecycle/manager.go
index 23c7bc9db..d17128b8c 100644
--- a/fodc/proxy/internal/lifecycle/manager.go
+++ b/fodc/proxy/internal/lifecycle/manager.go
@@ -20,6 +20,7 @@ package lifecycle
import (
"context"
+ "sort"
"sync"
fodcv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
@@ -258,16 +259,41 @@ func (m *Manager) buildInspectionResult(allData
[]*agentLifecycleData) *Inspecti
func (m *Manager) mergeGroups(allData []*agentLifecycleData)
[]*fodcv1.GroupLifecycleInfo {
groupMap := make(map[string]*fodcv1.GroupLifecycleInfo)
+ // Errors are unioned across agents because each agent may observe a
+ // different subset of per-node failures (e.g. liaison-0 sees cold-0
+ // time out while liaison-1 sees cold-0 panic). Last-wins on the rest
+ // of the GroupLifecycleInfo is fine -- name/catalog/resource_opts are
+ // agent-invariant -- but errors must be the deduped union.
+ mergedErrors := make(map[string]map[string]struct{})
for _, ad := range allData {
if ad == nil || ad.Data == nil {
continue
}
for _, g := range ad.Data.Groups {
groupMap[g.Name] = g
+ if len(g.Errors) == 0 {
+ continue
+ }
+ set, ok := mergedErrors[g.Name]
+ if !ok {
+ set = make(map[string]struct{})
+ mergedErrors[g.Name] = set
+ }
+ for _, e := range g.Errors {
+ set[e] = struct{}{}
+ }
}
}
groups := make([]*fodcv1.GroupLifecycleInfo, 0, len(groupMap))
- for _, g := range groupMap {
+ for name, g := range groupMap {
+ if set := mergedErrors[name]; len(set) > 0 {
+ errs := make([]string, 0, len(set))
+ for e := range set {
+ errs = append(errs, e)
+ }
+ sort.Strings(errs)
+ g.Errors = errs
+ }
groups = append(groups, g)
}
return groups
diff --git a/fodc/proxy/internal/lifecycle/manager_test.go
b/fodc/proxy/internal/lifecycle/manager_test.go
index 55a76f166..3f1117705 100644
--- a/fodc/proxy/internal/lifecycle/manager_test.go
+++ b/fodc/proxy/internal/lifecycle/manager_test.go
@@ -394,3 +394,64 @@ func TestManager_CollectLifecycle_ContextCanceled(t
*testing.T) {
result, _ := mgr.CollectLifecycle(ctx)
require.NotNil(t, result)
}
+
+// TestManager_MergeGroups_UnionsErrorsAcrossAgents verifies that when
+// multiple agents report the same group, the errors observed by each
+// agent are unioned (deduped, sorted) rather than overwritten last-wins,
+// so downstream consumers see every per-node failure regardless of which
+// agent's view they happened to land on.
+func TestManager_MergeGroups_UnionsErrorsAcrossAgents(t *testing.T) {
+ log := initTestLogger(t)
+ mgr := NewManager(nil, nil, log)
+
+ groupName := "sw_metric"
+ agentA := &agentLifecycleData{
+ PodName: "liaison-0",
+ Data: &fodcv1.LifecycleData{
+ Groups: []*fodcv1.GroupLifecycleInfo{{
+ Name: groupName,
+ Catalog: "CATALOG_MEASURE",
+ Errors: []string{
+ "node error: cold-0 panic",
+ "future error: rpc deadline",
+ },
+ }},
+ },
+ }
+ agentB := &agentLifecycleData{
+ PodName: "liaison-1",
+ Data: &fodcv1.LifecycleData{
+ Groups: []*fodcv1.GroupLifecycleInfo{{
+ Name: groupName,
+ Catalog: "CATALOG_MEASURE",
+ Errors: []string{
+ "node error: cold-0 panic", //
duplicate of A's first
+ "broadcast failed: dial timeout",
+ },
+ }},
+ },
+ }
+ agentC := &agentLifecycleData{
+ PodName: "liaison-2",
+ Data: &fodcv1.LifecycleData{
+ Groups: []*fodcv1.GroupLifecycleInfo{{
+ Name: groupName,
+ Catalog: "CATALOG_MEASURE",
+ // no errors
+ }},
+ },
+ }
+
+ merged := mgr.mergeGroups([]*agentLifecycleData{agentA, agentB, agentC})
+ require.Len(t, merged, 1)
+ g := merged[0]
+ assert.Equal(t, groupName, g.Name)
+ assert.Equal(t,
+ []string{
+ "broadcast failed: dial timeout",
+ "future error: rpc deadline",
+ "node error: cold-0 panic",
+ },
+ g.Errors,
+ "errors must be the deduped, sorted union of every agent's
view")
+}