This is an automated email from the ASF dual-hosted git repository.

mrproliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new b8168b76a Fix fetch segment nil pointer in cold stage and propagation 
the error when inspect the group (#1115)
b8168b76a is described below

commit b8168b76ab7c11de7d1603f3ebaac6cf5b1148a7
Author: mrproliu <[email protected]>
AuthorDate: Thu May 7 11:23:26 2026 +0800

    Fix fetch segment nil pointer in cold stage and propagation the error when 
inspect the group (#1115)
---
 CHANGES.md                                         |   2 +
 api/proto/banyandb/fodc/v1/rpc.proto               |  11 +
 banyand/internal/storage/index.go                  |  49 +++-
 banyand/internal/storage/segment.go                |  42 ++-
 banyand/internal/storage/segment_test.go           | 318 +++++++++++++++++++++
 banyand/liaison/grpc/deletion.go                   |  14 +-
 banyand/liaison/grpc/deletion_test.go              |  28 +-
 banyand/liaison/grpc/registry.go                   |   9 +-
 banyand/measure/metadata_internal_test.go          |  48 ++++
 banyand/metadata/client.go                         |   2 +-
 banyand/metadata/metadata.go                       |   2 +-
 banyand/metadata/schema/collector.go               |  46 ++-
 banyand/queue/sub/group_lifecycle.go               |  14 +-
 .../queue/sub/group_lifecycle_cold_tier_test.go    | 298 +++++++++++++++++++
 banyand/queue/sub/group_lifecycle_test.go          | 116 +++++++-
 banyand/stream/metadata_internal_test.go           |  48 ++++
 banyand/trace/metadata_internal_test.go            |  48 ++++
 docs/api-reference.md                              |   1 +
 fodc/proxy/internal/lifecycle/manager.go           |  28 +-
 fodc/proxy/internal/lifecycle/manager_test.go      |  61 ++++
 20 files changed, 1146 insertions(+), 39 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1a8a81e63..0f7457ef8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,8 @@ Release Notes.
 - Fix FODC lifecycle cache poisoning where transient `InspectAll` failures 
were cached for 10 minutes and masked liaison recovery; raise FODC agent and 
proxy timeouts from 10s to 40s.
 - Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g. 
`replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to 
`protojson` so all fields are emitted (nil nested messages serialize as `null`).
 - Fix trace `block_writer` panic on out-of-order timestamps within the same 
traceID, which dropped one trace-write batch per panic in multi-agent 
SkyWalking deployments. Spans of a single trace originate from 
independently-clocked services, and trace storage is organized by traceID 
rather than timestamp, so per-traceID timestamp monotonicity is not a writer 
invariant.
+- Fix nil-pointer panic on cold-tier data nodes when FODC `InspectAll` raced 
with idle-segment cleanup.
+- Add `GroupLifecycleInfo.errors` to surface per-group collection failures 
from FODC `InspectAll` instead of silently dropping the affected node entry.
 
 ### Chores
 
diff --git a/api/proto/banyandb/fodc/v1/rpc.proto 
b/api/proto/banyandb/fodc/v1/rpc.proto
index dd7a5c67a..9315da8a6 100644
--- a/api/proto/banyandb/fodc/v1/rpc.proto
+++ b/api/proto/banyandb/fodc/v1/rpc.proto
@@ -114,6 +114,17 @@ message GroupLifecycleInfo {
   string catalog = 2;
   banyandb.common.v1.ResourceOpts resource_opts = 3;
   repeated banyandb.database.v1.DataInfo data_info = 4;
+  // errors lists every failure observed while collecting data_info for this
+  // group: top-level CollectDataInfo failures (GetGroup, missing collector,
+  // dial failure -- prefixed "top-level: ") and per-node broadcast failures
+  // (prefixed "future error: ", "node error: ", "broadcast failed: ").
+  // Combined with len(data_info), consumers can tell the following four
+  // states apart:
+  //   - data_info empty && errors empty       -> no nodes reported (group 
inactive)
+  //   - data_info empty && errors non-empty   -> total failure
+  //   - data_info non-empty && errors empty   -> full success
+  //   - data_info non-empty && errors non-empty -> partial failure
+  repeated string errors = 5;
 }
 
 message LifecycleReport {
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index dff32d742..3ff0ae41c 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -37,14 +37,33 @@ import (
 const seriesIndexDirName = "sidx"
 
 func (s *segment[T, O]) IndexDB() IndexDB {
-       return s.index
+       // Snapshot s.index into a local: the field can be flipped to nil by
+       // performCleanup at any time, so a second read could see a different
+       // value. Returning the local also avoids boxing a nil *seriesIndex
+       // into a typed-nil IndexDB interface, which would defeat the
+       // `if indexDB == nil` guard on the caller side.
+       idx := s.index
+       if idx == nil {
+               return nil
+       }
+       return idx
 }
 
 func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) 
(pbv1.SeriesList, error) {
-       sl, err := s.index.filter(ctx, series, nil, nil, nil)
+       idx := s.index
+       if idx == nil {
+               return nil, ErrSegmentClosed
+       }
+       sl, err := idx.filter(ctx, series, nil, nil, nil)
        return sl.SeriesList, err
 }
 
+// seriesIndex is the per-segment series index. Every method on this type
+// starts with `if s == nil { ... }` purely to absorb the typed-nil leak
+// that `segment.IndexDB()` could expose if a caller bypassed the producer
+// guard there. The guards intentionally do NOT make the type fully
+// nil-safe -- they assume that on a non-nil receiver, store / l / metrics
+// / p are all populated by newSeriesIndex (the only constructor).
 type seriesIndex struct {
        store   index.SeriesStore
        l       *logger.Logger
@@ -79,28 +98,45 @@ func newSeriesIndex(ctx context.Context, root string, 
flushTimeoutSeconds int64,
 }
 
 func (s *seriesIndex) Insert(docs index.Documents) error {
+       if s == nil {
+               return ErrSegmentClosed
+       }
        return s.store.InsertSeriesBatch(index.Batch{
                Documents: docs,
        })
 }
 
 func (s *seriesIndex) Update(docs index.Documents) error {
+       if s == nil {
+               return ErrSegmentClosed
+       }
        return s.store.UpdateSeriesBatch(index.Batch{
                Documents: docs,
        })
 }
 
 func (s *seriesIndex) EnableExternalSegments() (index.ExternalSegmentStreamer, 
error) {
+       if s == nil {
+               return nil, ErrSegmentClosed
+       }
        return s.store.EnableExternalSegments()
 }
 
+// Stats degrades to (0, 0) on a nil receiver as defense-in-depth in case
+// a typed-nil interface leaks past segment.IndexDB().
 func (s *seriesIndex) Stats() (dataCount int64, dataSizeBytes int64) {
+       if s == nil {
+               return 0, 0
+       }
        return s.store.Stats()
 }
 
 func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
        projection []index.FieldKey, secondaryQuery index.Query, timeRange 
*timestamp.TimeRange,
 ) (data SeriesData, err error) {
+       if s == nil {
+               return SeriesData{}, ErrSegmentClosed
+       }
        if len(series) == 0 && secondaryQuery == nil && timeRange == nil {
                return data, nil
        }
@@ -231,6 +267,9 @@ func convertIndexSeriesToSeriesList(indexSeries 
[]index.SeriesDocument, hasField
 
 func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts 
IndexSearchOpts,
 ) (sd SeriesData, sortedValues [][]byte, err error) {
+       if s == nil {
+               return SeriesData{}, nil, ErrSegmentClosed
+       }
        tracer := query.GetTracer(ctx)
        if tracer != nil {
                var span *query.Span
@@ -309,6 +348,9 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, opts In
 }
 
 func (s *seriesIndex) SearchWithoutSeries(ctx context.Context, opts 
IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error) {
+       if s == nil {
+               return SeriesData{}, nil, ErrSegmentClosed
+       }
        tracer := query.GetTracer(ctx)
        if tracer != nil {
                var span *query.Span
@@ -384,6 +426,9 @@ func (s *seriesIndex) SearchWithoutSeries(ctx 
context.Context, opts IndexSearchO
 }
 
 func (s *seriesIndex) Close() error {
+       if s == nil {
+               return nil
+       }
        s.metrics.DeleteAll(s.p.SegLabelValues()...)
        return s.store.Close()
 }
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 04a333507..4f1490bec 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -159,13 +159,41 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T, 
shardIDs []common.ShardID,
        return tt, shardIDs, cc
 }
 
+// incRef acquires one reference on the segment, opening it via initialize
+// if it is currently closed (refCount<=0). On return, the caller is
+// guaranteed that the segment is alive (s.index != nil, shards loaded) and
+// will stay alive until the matching DecRef.
+//
+// The CAS loop closes a TOCTOU race that the older "Load + AddInt32"
+// shape suffered from: a goroutine could read refCount=1, race against a
+// concurrent DecRef that drives it to 0 and triggers performCleanup
+// (which sets s.index=nil and tears down shards), and then AddInt32 it
+// back to 1. The caller would walk away with refCount=1 -- formally a
+// valid reference -- but pointing at an already-cleaned-up segment, so
+// every subsequent IndexDB() / Tables() call would observe zero state.
+//
+// The CAS variant retries whenever a concurrent DecRef beats us to the
+// counter: if CAS fails, we re-Load and either succeed at the new value
+// or fall through to initialize() to reopen the segment under s.mu.
+// Symmetric to the CAS loop in DecRef, which guarantees the dual: a
+// DecRef that drives refCount to 0 sees no concurrent +1 sneak in
+// before performCleanup runs.
 func (s *segment[T, O]) incRef(ctx context.Context) error {
        s.lastAccessed.Store(time.Now().UnixNano())
-       if atomic.LoadInt32(&s.refCount) <= 0 {
-               return s.initialize(ctx)
+       for {
+               current := atomic.LoadInt32(&s.refCount)
+               if current <= 0 {
+                       // Either the segment was never opened or DecRef just 
drove
+                       // refCount to 0; reopen under the mutex.
+                       return s.initialize(ctx)
+               }
+               // CAS so a concurrent DecRef cannot flip refCount to 0 and run
+               // performCleanup between our Load and our increment. On failure
+               // we re-Load and re-evaluate the branch.
+               if atomic.CompareAndSwapInt32(&s.refCount, current, current+1) {
+                       return nil
+               }
        }
-       atomic.AddInt32(&s.refCount, 1)
-       return nil
 }
 
 func (s *segment[T, O]) initialize(ctx context.Context) error {
@@ -173,6 +201,12 @@ func (s *segment[T, O]) initialize(ctx context.Context) 
error {
        defer s.mu.Unlock()
 
        if atomic.LoadInt32(&s.refCount) > 0 {
+               // Another goroutine reopened the segment while we waited for 
the
+               // mutex. Add the +1 our caller (incRef) skipped before entering
+               // the slow path; otherwise concurrent reopens would share one
+               // refCount and the first DecRef would cleanup while we are 
still
+               // using it.
+               atomic.AddInt32(&s.refCount, 1)
                return nil
        }
 
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index 9a55dd358..f50e07603 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -24,6 +24,8 @@ import (
        "os"
        "path/filepath"
        "strconv"
+       "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -860,3 +862,319 @@ func TestOpenFallbackOldFormatMetadata(t *testing.T) {
                seg.DecRef()
        }
 }
+
+// TestSegment_IndexDB_ReturnsUntypedNilWhenClosed is a regression guard for
+// the cold-tier nil-pointer panic. After a segment has been idle-closed
+// (s.index == nil), segment.IndexDB() must return an untyped nil interface
+// so that the standard caller pattern `if indexDB == nil { ... }` short
+// circuits correctly. Returning the underlying *seriesIndex would box a
+// typed-nil interface, defeat the nil check, and crash on the next method
+// call.
+func TestSegment_IndexDB_ReturnsUntypedNilWhenClosed(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-untyped-nil")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{Database: "test-db", Stage: "test-stage"}
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 7},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx, tempDir, l, opts, nil, nil, time.Second,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache, group,
+       )
+
+       now := time.Now().UTC()
+       startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       endTime := startTime.Add(24 * time.Hour)
+
+       segPath := filepath.Join(tempDir, 
"segment-"+startTime.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath, DirPerm))
+       require.NoError(t, os.WriteFile(
+               filepath.Join(segPath, metadataFilename),
+               []byte(currentVersion), FilePerm))
+
+       seg, err := sc.openSegment(ctx, startTime, endTime, segPath,
+               startTime.Format(dayFormat), sc.groupCache)
+       require.NoError(t, err)
+
+       // Idle-close: drop the only reference so performCleanup runs and the
+       // segment ends up in the residual state seen on cold-tier nodes
+       // (refCount=0, s.index=nil, but segment still reachable for reopen).
+       seg.DecRef()
+       require.Nil(t, seg.index)
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+       indexDB := seg.IndexDB()
+       require.True(t, indexDB == nil,
+               "IndexDB() must return untyped nil after idle close so the 
caller's "+
+                       "`if indexDB == nil` guard fires")
+
+       // Drive the exact caller pattern from collectSeriesIndexInfo. It must
+       // short-circuit on the nil check and never dispatch Stats() on a nil
+       // receiver.
+       var dataCount, dataSize int64
+       func() {
+               defer func() {
+                       if r := recover(); r != nil {
+                               t.Fatalf("caller pattern must not panic but 
got: %v", r)
+                       }
+               }()
+               if indexDB != nil {
+                       dataCount, dataSize = indexDB.Stats()
+               }
+       }()
+       assert.Equal(t, int64(0), dataCount)
+       assert.Equal(t, int64(0), dataSize)
+}
+
+// TestSegment_ConcurrentReopen_RefCountConsistent stress-tests the
+// reference-count contract of segment.incRef under concurrent reopen.
+// Before the fix, segment.initialize() returned without AddInt32 when
+// another goroutine had already reopened the segment, so the second caller
+// "owned" a reference it had never accounted for. The first DecRef would
+// then trigger performCleanup while another goroutine was still using the
+// segment.
+//
+// After the fix, every concurrent incRef must add exactly one reference
+// regardless of which path it takes.
+//
+// Reliability note: a single 64-goroutine round can occasionally serialize
+// well enough that only the first caller enters the slow path
+// (initialize) and the rest take the fast path (CAS+1). That serialized
+// shape would also pass on the buggy code. To make the test deterministic
+// across machines, the experiment is repeated for `rounds` cycles, each
+// starting from the residual state. The bug becomes effectively
+// impossible to miss across all rounds combined.
+func TestSegment_ConcurrentReopen_RefCountConsistent(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-concurrent-reopen")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{Database: "test-db", Stage: "test-stage"}
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       2,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 7},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx, tempDir, l, opts, nil, nil, time.Second,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache, group,
+       )
+
+       now := time.Now().UTC()
+       startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       endTime := startTime.Add(24 * time.Hour)
+
+       segPath := filepath.Join(tempDir, 
"segment-"+startTime.Format(dayFormat))
+       require.NoError(t, os.MkdirAll(segPath, DirPerm))
+       require.NoError(t, os.WriteFile(
+               filepath.Join(segPath, metadataFilename),
+               []byte(currentVersion), FilePerm))
+
+       seg, err := sc.openSegment(ctx, startTime, endTime, segPath,
+               startTime.Format(dayFormat), sc.groupCache)
+       require.NoError(t, err)
+
+       // Bring the segment into the cold-tier residual state.
+       seg.DecRef()
+       require.Nil(t, seg.index)
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+       const (
+               N      = 64
+               rounds = 10
+       )
+       for r := 0; r < rounds; r++ {
+               require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount),
+                       "round %d: segment must start at refCount=0", r)
+               require.Nil(t, seg.index, "round %d: segment.index must start 
nil", r)
+
+               var (
+                       wg     sync.WaitGroup
+                       start  = make(chan struct{})
+                       errCnt int64
+               )
+               wg.Add(N)
+               for i := 0; i < N; i++ {
+                       go func() {
+                               defer wg.Done()
+                               <-start
+                               if incErr := seg.incRef(ctx); incErr != nil {
+                                       atomic.AddInt64(&errCnt, 1)
+                               }
+                       }()
+               }
+               close(start)
+               wg.Wait()
+
+               require.Equal(t, int64(0), atomic.LoadInt64(&errCnt),
+                       "round %d: incRef must not error", r)
+               require.Equal(t, int32(N), atomic.LoadInt32(&seg.refCount),
+                       "round %d: every concurrent incRef must add exactly one 
reference", r)
+               require.NotNil(t, seg.index, "round %d: segment.index must be 
reopened", r)
+
+               // Drain references; the last DecRef must trigger performCleanup
+               // and bring the segment back to the residual state for the next
+               // round.
+               for i := 0; i < N; i++ {
+                       seg.DecRef()
+               }
+               require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount),
+                       "round %d", r)
+               require.Nil(t, seg.index,
+                       "round %d: segment.index must be cleared by 
performCleanup", r)
+       }
+}
+
+// TestSegment_ConcurrentReopenAndClose_NoPanic exercises the cold-tier
+// reopen pattern under heavy concurrency: many goroutines repeatedly call
+// selectSegments, exercise IndexDB().Stats() on each returned segment, and
+// release with DecRef. With idle-closed segments as the starting state,
+// each iteration triggers initialize() to reopen the segment and the final
+// DecRef triggers performCleanup() to close it again, so closes and
+// reopens are interleaved across goroutines.
+//
+// Verifies (run with -race):
+//   - no goroutine panics (covers both the typed-nil and refcount fixes)
+//   - no data race is detected on the segment lifecycle
+//   - all segments end up cleanly closed (refCount == 0, index == nil)
+func TestSegment_ConcurrentReopenAndClose_NoPanic(t *testing.T) {
+       tempDir, cleanup := setupTestEnvironment(t)
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-concurrent-reopen-close")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{Database: "test-db", Stage: "test-stage"}
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       2,
+               SegmentInterval:                IntervalRule{Unit: DAY, Num: 1},
+               TTL:                            IntervalRule{Unit: DAY, Num: 7},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx, tempDir, l, opts, nil, nil, time.Second,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache, group,
+       )
+
+       now := time.Now().UTC()
+       day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, 
time.UTC)
+       day2 := day1.Add(24 * time.Hour)
+       day3 := day2.Add(24 * time.Hour)
+       days := []time.Time{day1, day2, day3}
+
+       for _, d := range days {
+               segPath := filepath.Join(tempDir, 
"segment-"+d.Format(dayFormat))
+               require.NoError(t, os.MkdirAll(segPath, DirPerm))
+               require.NoError(t, os.WriteFile(
+                       filepath.Join(segPath, metadataFilename),
+                       []byte(currentVersion), FilePerm))
+               seg, openErr := sc.openSegment(ctx, d, d.Add(24*time.Hour),
+                       segPath, d.Format(dayFormat), sc.groupCache)
+               require.NoError(t, openErr)
+               sc.Lock()
+               sc.lst = append(sc.lst, seg)
+               sc.sortLst()
+               sc.Unlock()
+               // Drop the open reference so each segment starts in the 
residual state.
+               seg.DecRef()
+               require.Nil(t, seg.index)
+       }
+
+       timeRange := timestamp.NewInclusiveTimeRange(day1, 
day3.Add(24*time.Hour))
+
+       const (
+               goroutines = 8
+               cycles     = 200
+       )
+       var (
+               wg       sync.WaitGroup
+               start    = make(chan struct{})
+               panicCnt int64
+       )
+       for i := 0; i < goroutines; i++ {
+               wg.Add(1)
+               go func(id int) {
+                       defer wg.Done()
+                       defer func() {
+                               if r := recover(); r != nil {
+                                       atomic.AddInt64(&panicCnt, 1)
+                                       t.Errorf("goroutine-%d panicked: %v", 
id, r)
+                               }
+                       }()
+                       <-start
+                       for c := 0; c < cycles; c++ {
+                               segs, selErr := sc.selectSegments(timeRange)
+                               if selErr != nil {
+                                       t.Errorf("goroutine-%d cycle %d 
selectSegments: %v", id, c, selErr)
+                                       return
+                               }
+                               for _, s := range segs {
+                                       if idx := s.IndexDB(); idx != nil {
+                                               _, _ = idx.Stats()
+                                       }
+                                       s.DecRef()
+                               }
+                       }
+               }(i)
+       }
+       close(start)
+       wg.Wait()
+
+       require.Equal(t, int64(0), atomic.LoadInt64(&panicCnt),
+               "no goroutine should panic during the concurrent reopen/close 
cycle")
+
+       sc.RLock()
+       final := append([]*segment[mockTSTable, mockTSTableOpener]{}, sc.lst...)
+       sc.RUnlock()
+       for _, s := range final {
+               require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount),
+                       "segment %s leaked references", s.suffix)
+               require.Nil(t, s.index, "segment %s should be cleaned up", 
s.suffix)
+       }
+}
diff --git a/banyand/liaison/grpc/deletion.go b/banyand/liaison/grpc/deletion.go
index 4670b8ca2..9cbbfbc6d 100644
--- a/banyand/liaison/grpc/deletion.go
+++ b/banyand/liaison/grpc/deletion.go
@@ -21,6 +21,7 @@ import (
        "context"
        "errors"
        "fmt"
+       "strings"
        "sync"
        "time"
 
@@ -158,11 +159,16 @@ func (m *groupDeletionTaskManager) startDeletion(ctx 
context.Context, group stri
                return fmt.Errorf("deletion task for group %s is already in 
progress", group)
        }
 
-       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       dataInfo, collectionErrs, dataErr := 
m.schemaRegistry.CollectDataInfo(ctx, group)
        if dataErr != nil {
                m.tasks.Delete(group)
                return fmt.Errorf("failed to collect data info for group %s: 
%w", group, dataErr)
        }
+       if len(collectionErrs) > 0 {
+               m.tasks.Delete(group)
+               return fmt.Errorf("incomplete data info for group %s, refusing 
to start deletion: %s",
+                       group, strings.Join(collectionErrs, "; "))
+       }
        var totalDataSize int64
        for _, di := range dataInfo {
                totalDataSize += di.GetDataSizeBytes()
@@ -452,10 +458,14 @@ func (m *groupDeletionTaskManager) getDeletionTask(ctx 
context.Context, group st
 }
 
 func (m *groupDeletionTaskManager) hasNonEmptyResources(ctx context.Context, 
group string) (bool, error) {
-       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       dataInfo, collectionErrs, dataErr := 
m.schemaRegistry.CollectDataInfo(ctx, group)
        if dataErr != nil {
                return false, fmt.Errorf("failed to collect data info: %w", 
dataErr)
        }
+       if len(collectionErrs) > 0 {
+               return false, fmt.Errorf("incomplete data info for group %s, 
cannot determine emptiness: %s",
+                       group, strings.Join(collectionErrs, "; "))
+       }
        for _, di := range dataInfo {
                if di.GetDataSizeBytes() > 0 {
                        return true, nil
diff --git a/banyand/liaison/grpc/deletion_test.go 
b/banyand/liaison/grpc/deletion_test.go
index 6cf47e957..812b5f460 100644
--- a/banyand/liaison/grpc/deletion_test.go
+++ b/banyand/liaison/grpc/deletion_test.go
@@ -121,7 +121,7 @@ func TestHasNonEmptyResources(t *testing.T) {
                        defer ctrl.Finish()
 
                        mockRepo := metadata.NewMockRepo(ctrl)
-                       mockRepo.EXPECT().CollectDataInfo(gomock.Any(), 
"test-group").Return(tt.infos, nil)
+                       mockRepo.EXPECT().CollectDataInfo(gomock.Any(), 
"test-group").Return(tt.infos, nil, nil)
 
                        m := &groupDeletionTaskManager{schemaRegistry: mockRepo}
                        hasResources, checkErr := 
m.hasNonEmptyResources(context.Background(), "test-group")
@@ -129,6 +129,30 @@ func TestHasNonEmptyResources(t *testing.T) {
                        assert.Equal(t, tt.expected, hasResources)
                })
        }
+
+       // Partial-failure path: when CollectDataInfo reports per-node errors,
+       // hasNonEmptyResources must refuse to answer (returning an error)
+       // rather than silently treating the half-populated DataInfo as
+       // authoritative. A wrong "false" here would let the caller delete
+       // data on the silent node.
+       t.Run("partial failure refuses to answer", func(t *testing.T) {
+               ctrl := gomock.NewController(t)
+               defer ctrl.Finish()
+
+               mockRepo := metadata.NewMockRepo(ctrl)
+               mockRepo.EXPECT().CollectDataInfo(gomock.Any(), 
"test-group").Return(
+                       []*databasev1.DataInfo{{DataSizeBytes: 0}}, // looks 
empty
+                       []string{"node error: cold-0 panic"},       // but 
cold-0 did not report
+                       nil,
+               )
+
+               m := &groupDeletionTaskManager{schemaRegistry: mockRepo}
+               hasResources, checkErr := 
m.hasNonEmptyResources(context.Background(), "test-group")
+               require.Error(t, checkErr, "partial collection failure must 
surface as an error, not (false, nil)")
+               assert.Contains(t, checkErr.Error(), "incomplete data info")
+               assert.Contains(t, checkErr.Error(), "cold-0 panic")
+               assert.False(t, hasResources, "the bool return is irrelevant 
when the error is non-nil; caller must check err first")
+       })
 }
 
 func TestDeletion(t *testing.T) {
@@ -192,7 +216,7 @@ func TestDeletion(t *testing.T) {
                }
 
                mockRepo := metadata.NewMockRepo(ctrl)
-               mockRepo.EXPECT().CollectDataInfo(gomock.Any(), 
group).Return([]*databasev1.DataInfo{{DataSizeBytes: 512}}, nil)
+               mockRepo.EXPECT().CollectDataInfo(gomock.Any(), 
group).Return([]*databasev1.DataInfo{{DataSizeBytes: 512}}, nil, nil)
                
mockRepo.EXPECT().IndexRuleBindingRegistry().Return(&stubIndexRuleBinding{})
                mockRepo.EXPECT().IndexRuleRegistry().Return(&stubIndexRule{})
 
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 58f365213..3b06e2d3d 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -20,6 +20,8 @@ package grpc
 import (
        "context"
        "errors"
+       "fmt"
+       "strings"
        "time"
 
        "google.golang.org/grpc/codes"
@@ -768,11 +770,16 @@ func (rs *groupRegistryServer) Inspect(ctx 
context.Context, req *databasev1.Grou
                rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
                return nil, schemaErr
        }
-       dataInfo, dataErr := rs.schemaRegistry.CollectDataInfo(ctx, g)
+       dataInfo, collectionErrs, dataErr := 
rs.schemaRegistry.CollectDataInfo(ctx, g)
        if dataErr != nil {
                rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
                return nil, dataErr
        }
+       if len(collectionErrs) > 0 {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+               return nil, fmt.Errorf("incomplete data info for group %s: %s",
+                       g, strings.Join(collectionErrs, "; "))
+       }
        liaisonInfo, liaisonErr := rs.schemaRegistry.CollectLiaisonInfo(ctx, g)
        if liaisonErr != nil {
                rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
diff --git a/banyand/measure/metadata_internal_test.go 
b/banyand/measure/metadata_internal_test.go
new file mode 100644
index 000000000..87fa5db3a
--- /dev/null
+++ b/banyand/measure/metadata_internal_test.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+)
+
+// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
+// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
+// signature without implementing every method (only IndexDB is exercised
+// by collectSeriesIndexInfo).
+type nilIndexDBSegment struct {
+       storage.Segment[*tsTable, option]
+}
+
+func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+
+// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
+// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
+// when the segment is in the cold-tier residual state where its underlying
+// series index has been torn down by performCleanup.
+func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+       sr := &schemaRepo{}
+       info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+       require.NotNil(t, info)
+       require.Equal(t, int64(0), info.DataCount)
+       require.Equal(t, int64(0), info.DataSizeBytes)
+}
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index fcd72a79a..fd11c9e46 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -502,7 +502,7 @@ func (s *clientService) Subjects(ctx context.Context, 
indexRule *databasev1.Inde
        return foundSubjects, subjectErr
 }
 
-func (s *clientService) CollectDataInfo(ctx context.Context, group string) 
([]*databasev1.DataInfo, error) {
+func (s *clientService) CollectDataInfo(ctx context.Context, group string) 
([]*databasev1.DataInfo, []string, error) {
        return s.infoCollectorRegistry.CollectDataInfo(ctx, group)
 }
 
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 33865e820..59ef2332e 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -52,7 +52,7 @@ type Repo interface {
        RegisterHandler(string, schema.Kind, schema.EventHandler)
        NodeRegistry() schema.Node
        PropertyRegistry() schema.Property
-       CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, error)
+       CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, 
[]string, error)
        CollectLiaisonInfo(context.Context, string) ([]*databasev1.LiaisonInfo, 
error)
        DropGroup(ctx context.Context, catalog commonv1.Catalog, group string) 
error
 }
diff --git a/banyand/metadata/schema/collector.go 
b/banyand/metadata/schema/collector.go
index 9b297fd66..6ece113de 100644
--- a/banyand/metadata/schema/collector.go
+++ b/banyand/metadata/schema/collector.go
@@ -39,6 +39,20 @@ import (
 // merge/flush load can take several seconds) while still bounding the call.
 const inspectBroadcastTimeout = 30 * time.Second
 
+// Prefixes used by every producer that writes into
+// GroupLifecycleInfo.errors so downstream consumers can categorize
+// failures via a stable prefix match. The full vocabulary lives here:
+// BroadcastErrorPrefix / FutureErrorPrefix / NodeErrorPrefix are written
+// by broadcastCollectDataInfo below; TopLevelErrorPrefix is written by
+// the InspectAll RPC handler in banyand/queue/sub/group_lifecycle.go,
+// which observes the top-level error returned by CollectDataInfo.
+const (
+       BroadcastErrorPrefix = "broadcast failed: "
+       FutureErrorPrefix    = "future error: "
+       NodeErrorPrefix      = "node error: "
+       TopLevelErrorPrefix  = "top-level: "
+)
+
 // GroupGetter provides method to get group metadata.
 type GroupGetter interface {
        GetGroup(ctx context.Context, group string) (*commonv1.Group, error)
@@ -72,22 +86,29 @@ func NewInfoCollectorRegistry(l *logger.Logger, groupGetter 
GroupGetter) *InfoCo
        }
 }
 
-// CollectDataInfo collects data information from both local and remote data 
nodes.
-func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group 
string) ([]*databasev1.DataInfo, error) {
+// CollectDataInfo collects data information from both local and remote data
+// nodes. Returns the aggregated DataInfo slice plus a separate slice of
+// per-node collection errors observed during broadcast (broadcast failure,
+// future errors, listener-side common.Error responses); the two slices
+// are independent -- the errors slice only contains failures and is not
+// positionally aligned with DataInfo. The error return is reserved for
+// top-level failures that prevent any aggregation (GetGroup failure,
+// local collector failure, unsupported catalog).
+func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group 
string) ([]*databasev1.DataInfo, []string, error) {
        g, getErr := icr.groupGetter.GetGroup(ctx, group)
        if getErr != nil {
-               return nil, getErr
+               return nil, nil, getErr
        }
        localInfo, localErr := icr.collectDataInfoLocal(ctx, g.Catalog, group)
        if localErr != nil {
-               return nil, localErr
+               return nil, nil, localErr
        }
        localInfoList := []*databasev1.DataInfo{}
        if localInfo != nil {
                localInfoList = []*databasev1.DataInfo{localInfo}
        }
        if icr.dataBroadcaster == nil {
-               return localInfoList, nil
+               return localInfoList, nil, nil
        }
 
        var topic bus.Topic
@@ -99,25 +120,27 @@ func (icr *InfoCollectorRegistry) CollectDataInfo(ctx 
context.Context, group str
        case commonv1.Catalog_CATALOG_TRACE:
                topic = data.TopicTraceCollectDataInfo
        default:
-               return nil, fmt.Errorf("unsupported catalog type: %v", 
g.Catalog)
+               return nil, nil, fmt.Errorf("unsupported catalog type: %v", 
g.Catalog)
        }
-       remoteInfo := icr.broadcastCollectDataInfo(topic, group)
-       return append(localInfoList, remoteInfo...), nil
+       remoteInfo, collectionErrors := icr.broadcastCollectDataInfo(topic, 
group)
+       return append(localInfoList, remoteInfo...), collectionErrors, nil
 }
 
-func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic, 
group string) []*databasev1.DataInfo {
+func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic, 
group string) ([]*databasev1.DataInfo, []string) {
        message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
&databasev1.GroupRegistryServiceInspectRequest{Group: group})
        futures, broadcastErr := 
icr.dataBroadcaster.Broadcast(inspectBroadcastTimeout, topic, message)
        if broadcastErr != nil {
                icr.l.Warn().Err(broadcastErr).Str("group", group).Msg("failed 
to broadcast collect data info request")
-               return []*databasev1.DataInfo{}
+               return []*databasev1.DataInfo{}, []string{BroadcastErrorPrefix 
+ broadcastErr.Error()}
        }
 
        dataInfoList := make([]*databasev1.DataInfo, 0, len(futures))
+       var collectionErrors []string
        for _, future := range futures {
                msg, getErr := future.Get()
                if getErr != nil {
                        icr.l.Warn().Err(getErr).Str("group", 
group).Msg("failed to get collect data info response")
+                       collectionErrors = append(collectionErrors, 
FutureErrorPrefix+getErr.Error())
                        continue
                }
                msgData := msg.Data()
@@ -128,9 +151,10 @@ func (icr *InfoCollectorRegistry) 
broadcastCollectDataInfo(topic bus.Topic, grou
                        }
                case *common.Error:
                        icr.l.Warn().Str("error", d.Error()).Str("group", 
group).Msg("error collecting data info from node")
+                       collectionErrors = append(collectionErrors, 
NodeErrorPrefix+d.Error())
                }
        }
-       return dataInfoList
+       return dataInfoList, collectionErrors
 }
 
 func (icr *InfoCollectorRegistry) collectDataInfoLocal(ctx context.Context, 
catalog commonv1.Catalog, group string) (*databasev1.DataInfo, error) {
diff --git a/banyand/queue/sub/group_lifecycle.go 
b/banyand/queue/sub/group_lifecycle.go
index 8104015da..d8d1f495b 100644
--- a/banyand/queue/sub/group_lifecycle.go
+++ b/banyand/queue/sub/group_lifecycle.go
@@ -24,6 +24,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 )
 
 // maxInspectGroupConcurrency caps how many groups InspectAll will inspect
@@ -77,12 +78,19 @@ func (s *server) inspectGroup(ctx context.Context, group 
*commonv1.Group) *fodcv
                Catalog:      catalogToString(group.Catalog),
                ResourceOpts: group.ResourceOpts,
        }
-       dataInfo, err := s.metadataRepo.CollectDataInfo(ctx, groupName)
+       // CollectDataInfo's contract: a non-nil err means no DataInfo and nil
+       // collectionErrs (per banyand/metadata/schema/collector.go); a nil err
+       // means DataInfo is populated and collectionErrs may carry per-node
+       // broadcast failures. The two paths never overlap, so the top-level
+       // branch sets a fresh single-element Errors slice.
+       dataInfo, collectionErrs, err := s.metadataRepo.CollectDataInfo(ctx, 
groupName)
        if err != nil {
                s.log.Warn().Err(err).Str("group", groupName).Msg("Failed to 
collect data info")
-       } else {
-               info.DataInfo = dataInfo
+               info.Errors = []string{schema.TopLevelErrorPrefix + err.Error()}
+               return info
        }
+       info.DataInfo = dataInfo
+       info.Errors = collectionErrs
        return info
 }
 
diff --git a/banyand/queue/sub/group_lifecycle_cold_tier_test.go 
b/banyand/queue/sub/group_lifecycle_cold_tier_test.go
new file mode 100644
index 000000000..46beacd4c
--- /dev/null
+++ b/banyand/queue/sub/group_lifecycle_cold_tier_test.go
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sub
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// coldTierRepo is a metadata.Repo stub modeled on cold-tier data nodes.
+// CollectDataInfo simulates the post-fix output of 
schemaRepo.collectSeriesIndexInfo
+// for idle-closed segments: returns DataInfo whose SegmentInfo entries each 
carry
+// an empty SeriesIndexInfo (DataCount=0, DataSizeBytes=0). The hook lets tests
+// inject failures, latency, or per-call mutation to mimic the cold-tier
+// "InspectAll while closeIdleSegments is firing" race window.
+type coldTierRepo struct {
+       metadata.Repo
+       groupRegistry *mockGroupRegistry
+       hook          func(group string, callIdx int) ([]*databasev1.DataInfo, 
[]string, error)
+       callsByGroup  sync.Map // map[string]*atomic.Int64
+       totalCalls    atomic.Int64
+       concurrentNow atomic.Int32
+       concurrentMax atomic.Int32
+       panicCount    atomic.Int32
+}
+
+func (r *coldTierRepo) GroupRegistry() schema.Group {
+       return r.groupRegistry
+}
+
+func (r *coldTierRepo) CollectDataInfo(_ context.Context, group string) (out 
[]*databasev1.DataInfo, collectionErrs []string, err error) {
+       defer func() {
+               if rec := recover(); rec != nil {
+                       r.panicCount.Add(1)
+                       err = fmt.Errorf("panic in CollectDataInfo: %v", rec)
+               }
+       }()
+       r.totalCalls.Add(1)
+       current := r.concurrentNow.Add(1)
+       defer r.concurrentNow.Add(-1)
+       for {
+               hi := r.concurrentMax.Load()
+               if current <= hi || r.concurrentMax.CompareAndSwap(hi, current) 
{
+                       break
+               }
+       }
+       cntAny, _ := r.callsByGroup.LoadOrStore(group, new(atomic.Int64))
+       idx := cntAny.(*atomic.Int64).Add(1)
+       if r.hook != nil {
+               return r.hook(group, int(idx-1))
+       }
+       return coldTierEmptyDataInfo(), nil, nil
+}
+
+// coldTierEmptyDataInfo mirrors the DataInfo shape the fixed schemaRepo
+// returns when every selected segment is in the idle-closed residual state
+// (s.index = nil): non-nil top-level DataInfo with one SegmentInfo entry whose
+// SeriesIndexInfo is the zero value.
+func coldTierEmptyDataInfo() []*databasev1.DataInfo {
+       return []*databasev1.DataInfo{
+               {
+                       SegmentInfo: []*databasev1.SegmentInfo{
+                               {
+                                       SegmentId:       "seg-cold-tier-idle",
+                                       SeriesIndexInfo: 
&databasev1.SeriesIndexInfo{},
+                                       ShardInfo:       
[]*databasev1.ShardInfo{},
+                               },
+                       },
+                       DataSizeBytes: 0,
+               },
+       }
+}
+
+func newColdTierGroups(catalogs ...commonv1.Catalog) []*commonv1.Group {
+       groups := make([]*commonv1.Group, 0, len(catalogs))
+       for i, cat := range catalogs {
+               groups = append(groups, &commonv1.Group{
+                       Metadata: &commonv1.Metadata{Name: 
fmt.Sprintf("cold_group_%d", i)},
+                       Catalog:  cat,
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               ShardNum: 2,
+                               SegmentInterval: &commonv1.IntervalRule{
+                                       Unit: commonv1.IntervalRule_UNIT_DAY, 
Num: 1,
+                               },
+                               Ttl: &commonv1.IntervalRule{
+                                       Unit: commonv1.IntervalRule_UNIT_DAY, 
Num: 30,
+                               },
+                       },
+               })
+       }
+       return groups
+}
+
+// TestInspectAll_ColdTier_Integration simulates the cold-tier traffic
+// pattern that triggered the production nil-pointer panic on
+// demo-banyandb-data-cold-0. FODC InspectAll fan-out plus
+// closeIdleSegments leave segments in the "refCount=0, index=nil"
+// residual state. With the typed-nil and refcount fixes applied,
+// schemaRepo.collectSeriesIndexInfo short-circuits and returns an empty
+// SeriesIndexInfo rather than panicking.
+//
+// This test wires the real InspectAll RPC handler against a stub
+// metadata.Repo that mirrors the post-fix output of the storage path,
+// then drives four cold-tier scenarios end-to-end:
+//
+//     A. single InspectAll after idle-close
+//     B. repeated InspectAll cycles (cold-tier reinspect)
+//     C. concurrent InspectAll calls
+//     D. concurrent InspectAll with simulated idle-close churn between calls
+func TestInspectAll_ColdTier_Integration(t *testing.T) {
+       groups := newColdTierGroups(
+               commonv1.Catalog_CATALOG_MEASURE,
+               commonv1.Catalog_CATALOG_STREAM,
+               commonv1.Catalog_CATALOG_TRACE,
+       )
+
+       t.Run("A_SingleInspectAfterIdleClose", func(t *testing.T) {
+               repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups: 
groups}}
+               s := &server{log: logger.GetLogger("cold-tier-A"), 
metadataRepo: repo}
+
+               resp, err := s.InspectAll(context.Background(), 
&fodcv1.InspectAllRequest{})
+               require.NoError(t, err)
+               require.NotNil(t, resp)
+               require.Len(t, resp.Groups, len(groups))
+
+               for _, g := range resp.Groups {
+                       require.NotEmpty(t, g.DataInfo, "group %s should expose 
post-fix DataInfo", g.Name)
+                       for _, di := range g.DataInfo {
+                               require.NotNil(t, di)
+                               for _, seg := range di.SegmentInfo {
+                                       require.NotNil(t, seg.SeriesIndexInfo,
+                                               "every segment must carry a 
non-nil (possibly empty) SeriesIndexInfo")
+                                       assert.Equal(t, int64(0), 
seg.SeriesIndexInfo.DataCount)
+                                       assert.Equal(t, int64(0), 
seg.SeriesIndexInfo.DataSizeBytes)
+                               }
+                       }
+               }
+               assert.Equal(t, int32(0), repo.panicCount.Load(), "no goroutine 
should panic")
+       })
+
+       t.Run("B_RepeatedInspectAll_ColdTierReinspect", func(t *testing.T) {
+               repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups: 
groups}}
+               s := &server{log: logger.GetLogger("cold-tier-B"), 
metadataRepo: repo}
+
+               const cycles = 25
+               for i := 0; i < cycles; i++ {
+                       resp, err := s.InspectAll(context.Background(), 
&fodcv1.InspectAllRequest{})
+                       require.NoError(t, err, "cycle %d", i)
+                       require.Len(t, resp.Groups, len(groups), "cycle %d 
returned wrong group count", i)
+               }
+
+               // Every group must have been visited exactly `cycles` times -- 
no state
+               // leaks across repeated InspectAll calls (this is the "再次获取" 
check).
+               for _, g := range groups {
+                       cntAny, ok := repo.callsByGroup.Load(g.Metadata.Name)
+                       require.True(t, ok, "group %s missing call counter", 
g.Metadata.Name)
+                       assert.Equal(t, int64(cycles), 
cntAny.(*atomic.Int64).Load(),
+                               "group %s expected exactly %d calls", 
g.Metadata.Name, cycles)
+               }
+               assert.Equal(t, int64(cycles*len(groups)), 
repo.totalCalls.Load())
+               assert.Equal(t, int32(0), repo.panicCount.Load())
+       })
+
+       t.Run("C_ConcurrentInspectAll", func(t *testing.T) {
+               repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups: 
groups}}
+               s := &server{log: logger.GetLogger("cold-tier-C"), 
metadataRepo: repo}
+
+               const callers = 8
+               var (
+                       wg       sync.WaitGroup
+                       start    = make(chan struct{})
+                       panicCnt int64
+               )
+               for i := 0; i < callers; i++ {
+                       wg.Add(1)
+                       go func(id int) {
+                               defer wg.Done()
+                               defer func() {
+                                       if r := recover(); r != nil {
+                                               atomic.AddInt64(&panicCnt, 1)
+                                               t.Errorf("caller-%d panicked: 
%v", id, r)
+                                       }
+                               }()
+                               <-start
+                               resp, err := s.InspectAll(context.Background(), 
&fodcv1.InspectAllRequest{})
+                               if !assert.NoError(t, err, "caller-%d", id) {
+                                       return
+                               }
+                               if !assert.NotNil(t, resp, "caller-%d", id) {
+                                       return
+                               }
+                               assert.Len(t, resp.Groups, len(groups), 
"caller-%d", id)
+                       }(i)
+               }
+               close(start)
+               wg.Wait()
+
+               assert.Equal(t, int64(0), panicCnt, "no caller may panic")
+               assert.Equal(t, int32(0), repo.panicCount.Load())
+               assert.Equal(t, int64(callers*len(groups)), 
repo.totalCalls.Load())
+               // Per-group fan-out cap is 32, so at least two groups should 
have
+               // overlapped within at least one InspectAll invocation.
+               assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2),
+                       "concurrent InspectAll should overlap at least two 
CollectDataInfo calls")
+       })
+
+       t.Run("D_ConcurrentInspectAll_WithIdleCloseChurn", func(t *testing.T) {
+               // hook simulates the cold-tier race window: every other call 
returns
+               // a "fresh idle close just happened" payload (still empty), 
and a few
+               // calls return a benign error to mimic transient disk pressure.
+               brittleHook := func(group string, callIdx int) 
([]*databasev1.DataInfo, []string, error) {
+                       if callIdx%17 == 0 && callIdx > 0 {
+                               return nil, nil, fmt.Errorf("simulated 
transient close error on %s call#%d", group, callIdx)
+                       }
+                       // Yield to amplify scheduling jitter, then return 
empty info.
+                       time.Sleep(50 * time.Microsecond)
+                       return coldTierEmptyDataInfo(), nil, nil
+               }
+               repo := &coldTierRepo{
+                       groupRegistry: &mockGroupRegistry{groups: groups},
+                       hook:          brittleHook,
+               }
+               s := &server{log: logger.GetLogger("cold-tier-D"), 
metadataRepo: repo}
+
+               const callers = 6
+               const cycles = 20
+               var (
+                       wg       sync.WaitGroup
+                       start    = make(chan struct{})
+                       panicCnt int64
+               )
+               for i := 0; i < callers; i++ {
+                       wg.Add(1)
+                       go func(id int) {
+                               defer wg.Done()
+                               defer func() {
+                                       if r := recover(); r != nil {
+                                               atomic.AddInt64(&panicCnt, 1)
+                                               t.Errorf("caller-%d panicked: 
%v", id, r)
+                                       }
+                               }()
+                               <-start
+                               for c := 0; c < cycles; c++ {
+                                       resp, err := 
s.InspectAll(context.Background(), &fodcv1.InspectAllRequest{})
+                                       // require.* would FailNow inside this 
goroutine, which
+                                       // is forbidden by the testing package; 
use assert and
+                                       // bail out early on the first failure.
+                                       if !assert.NoError(t, err, "caller-%d 
cycle-%d", id, c) {
+                                               return
+                                       }
+                                       if !assert.NotNil(t, resp, "caller-%d 
cycle-%d", id, c) {
+                                               return
+                                       }
+                                       if !assert.Len(t, resp.Groups, 
len(groups), "caller-%d cycle-%d", id, c) {
+                                               return
+                                       }
+                               }
+                       }(i)
+               }
+               close(start)
+               wg.Wait()
+
+               require.Equal(t, int64(0), panicCnt, "no caller may panic")
+               require.Equal(t, int32(0), repo.panicCount.Load(),
+                       "CollectDataInfo must not panic under the cold-tier 
churn")
+               assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2),
+                       "concurrent InspectAll under churn should overlap at 
least two CollectDataInfo calls")
+       })
+}
diff --git a/banyand/queue/sub/group_lifecycle_test.go 
b/banyand/queue/sub/group_lifecycle_test.go
index f3a5cd6ff..4f912b323 100644
--- a/banyand/queue/sub/group_lifecycle_test.go
+++ b/banyand/queue/sub/group_lifecycle_test.go
@@ -47,20 +47,21 @@ func (m *mockGroupRegistry) ListGroup(_ context.Context) 
([]*commonv1.Group, err
 
 type mockMetadataRepo struct {
        metadata.Repo
-       groupRegistry *mockGroupRegistry
-       dataInfoMap   map[string][]*databasev1.DataInfo
-       dataInfoErr   map[string]error
-       slowGroups    map[string]time.Duration
-       collectStarts atomic.Int32
-       concurrentMax atomic.Int32
-       concurrentNow atomic.Int32
+       groupRegistry    *mockGroupRegistry
+       dataInfoMap      map[string][]*databasev1.DataInfo
+       dataInfoErr      map[string]error
+       collectionErrors map[string][]string
+       slowGroups       map[string]time.Duration
+       collectStarts    atomic.Int32
+       concurrentMax    atomic.Int32
+       concurrentNow    atomic.Int32
 }
 
 func (m *mockMetadataRepo) GroupRegistry() schema.Group {
        return m.groupRegistry
 }
 
-func (m *mockMetadataRepo) CollectDataInfo(_ context.Context, group string) 
([]*databasev1.DataInfo, error) {
+func (m *mockMetadataRepo) CollectDataInfo(_ context.Context, group string) 
([]*databasev1.DataInfo, []string, error) {
        m.collectStarts.Add(1)
        current := m.concurrentNow.Add(1)
        defer m.concurrentNow.Add(-1)
@@ -77,13 +78,18 @@ func (m *mockMetadataRepo) CollectDataInfo(_ 
context.Context, group string) ([]*
        }
        if m.dataInfoErr != nil {
                if err, ok := m.dataInfoErr[group]; ok {
-                       return nil, err
+                       return nil, nil, err
                }
        }
+       var dataInfo []*databasev1.DataInfo
        if m.dataInfoMap != nil {
-               return m.dataInfoMap[group], nil
+               dataInfo = m.dataInfoMap[group]
        }
-       return nil, nil
+       var collectionErrs []string
+       if m.collectionErrors != nil {
+               collectionErrs = m.collectionErrors[group]
+       }
+       return dataInfo, collectionErrs, nil
 }
 
 func TestCatalogToString(t *testing.T) {
@@ -264,4 +270,92 @@ func TestInspectAll_RunsGroupsInParallel(t *testing.T) {
        assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2),
                "at least two CollectDataInfo invocations must overlap; 
observed max concurrency was %d",
                repo.concurrentMax.Load())
+       for _, g := range resp.Groups {
+               assert.Empty(t, g.Errors, "successful group %s must not carry 
errors", g.Name)
+       }
+}
+
+// TestInspectAll_SurfacesTopLevelError asserts that a top-level
+// CollectDataInfo error is reported through GroupLifecycleInfo.errors with
+// a "top-level:" prefix and that DataInfo stays empty for that group.
+func TestInspectAll_SurfacesTopLevelError(t *testing.T) {
+       repo := &mockMetadataRepo{
+               groupRegistry: &mockGroupRegistry{
+                       groups: []*commonv1.Group{
+                               {Metadata: &commonv1.Metadata{Name: 
"ok_group"}, Catalog: commonv1.Catalog_CATALOG_MEASURE},
+                               {Metadata: &commonv1.Metadata{Name: 
"bad_group"}, Catalog: commonv1.Catalog_CATALOG_STREAM},
+                       },
+               },
+               dataInfoMap: map[string][]*databasev1.DataInfo{
+                       "ok_group": {{DataSizeBytes: 1024}},
+               },
+               dataInfoErr: map[string]error{
+                       "bad_group": fmt.Errorf("metadataRepo: simulated load 
failure"),
+               },
+       }
+       s := &server{log: logger.GetLogger("test"), metadataRepo: repo}
+
+       resp, err := s.InspectAll(context.Background(), 
&fodcv1.InspectAllRequest{})
+       require.NoError(t, err)
+       require.Len(t, resp.Groups, 2)
+
+       byName := map[string]*fodcv1.GroupLifecycleInfo{}
+       for _, g := range resp.Groups {
+               byName[g.Name] = g
+       }
+
+       ok := byName["ok_group"]
+       require.NotNil(t, ok)
+       assert.Empty(t, ok.Errors, "ok_group must not carry errors")
+       assert.NotEmpty(t, ok.DataInfo)
+
+       bad := byName["bad_group"]
+       require.NotNil(t, bad)
+       assert.Empty(t, bad.DataInfo, "failed group must not expose stale 
DataInfo")
+       require.NotEmpty(t, bad.Errors, "failed group must surface the 
CollectDataInfo error")
+       assert.Contains(t, bad.Errors[0], "top-level:",
+               "top-level failures must be tagged with the top-level prefix")
+       assert.Contains(t, bad.Errors[0], "simulated load failure",
+               "the original error message must be preserved")
+}
+
+// TestInspectAll_PropagatesPartialFailureErrors asserts that per-node
+// failures reported by mockMetadataRepo.collectionErrors are passed
+// through to GroupLifecycleInfo.errors for the originating group, while
+// DataInfo still carries the entries from nodes that succeeded.
+func TestInspectAll_PropagatesPartialFailureErrors(t *testing.T) {
+       repo := &mockMetadataRepo{
+               groupRegistry: &mockGroupRegistry{
+                       groups: []*commonv1.Group{
+                               {Metadata: &commonv1.Metadata{Name: 
"partial_group"}, Catalog: commonv1.Catalog_CATALOG_MEASURE},
+                       },
+               },
+               dataInfoMap: map[string][]*databasev1.DataInfo{
+                       "partial_group": {{DataSizeBytes: 512}},
+               },
+               collectionErrors: map[string][]string{
+                       "partial_group": {
+                               "future error: rpc error: nil pointer 
dereference",
+                               "node error: cold-0 panic",
+                       },
+               },
+       }
+       s := &server{log: logger.GetLogger("test"), metadataRepo: repo}
+
+       resp, err := s.InspectAll(context.Background(), 
&fodcv1.InspectAllRequest{})
+       require.NoError(t, err)
+       require.Len(t, resp.Groups, 1)
+       g := resp.Groups[0]
+       assert.NotEmpty(t, g.DataInfo, "partial success must still expose the 
DataInfo entries it did get")
+       assert.Equal(t,
+               []string{
+                       "future error: rpc error: nil pointer dereference",
+                       "node error: cold-0 panic",
+               },
+               g.Errors,
+               "per-node collection errors must be passed through to 
GroupLifecycleInfo.errors")
+       for _, e := range g.Errors {
+               assert.NotContains(t, e, "top-level:",
+                       "partial-success errors must not carry the top-level: 
prefix")
+       }
 }
diff --git a/banyand/stream/metadata_internal_test.go 
b/banyand/stream/metadata_internal_test.go
new file mode 100644
index 000000000..a292b6704
--- /dev/null
+++ b/banyand/stream/metadata_internal_test.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+)
+
+// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
+// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
+// signature without implementing every method (only IndexDB is exercised
+// by collectSeriesIndexInfo).
+type nilIndexDBSegment struct {
+       storage.Segment[*tsTable, option]
+}
+
+func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+
+// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
+// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
+// when the segment is in the cold-tier residual state where its underlying
+// series index has been torn down by performCleanup.
+func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+       sr := &schemaRepo{}
+       info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+       require.NotNil(t, info)
+       require.Equal(t, int64(0), info.DataCount)
+       require.Equal(t, int64(0), info.DataSizeBytes)
+}
diff --git a/banyand/trace/metadata_internal_test.go 
b/banyand/trace/metadata_internal_test.go
new file mode 100644
index 000000000..378078907
--- /dev/null
+++ b/banyand/trace/metadata_internal_test.go
@@ -0,0 +1,48 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+)
+
+// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
+// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
+// signature without implementing every method (only IndexDB is exercised
+// by collectSeriesIndexInfo).
+type nilIndexDBSegment struct {
+       storage.Segment[*tsTable, option]
+}
+
+func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+
+// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
+// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
+// when the segment is in the cold-tier residual state where its underlying
+// series index has been torn down by performCleanup.
+func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+       sr := &schemaRepo{}
+       info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+       require.NotNil(t, info)
+       require.Equal(t, int64(0), info.DataCount)
+       require.Equal(t, int64(0), info.DataSizeBytes)
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 631bd679c..04885839a 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -5282,6 +5282,7 @@ Phase represents the current phase of the deletion task.
 | catalog | [string](#string) |  |  |
 | resource_opts | 
[banyandb.common.v1.ResourceOpts](#banyandb-common-v1-ResourceOpts) |  |  |
 | data_info | [banyandb.database.v1.DataInfo](#banyandb-database-v1-DataInfo) 
| repeated |  |
+| errors | [string](#string) | repeated | errors lists every failure observed 
while collecting data_info for this group: top-level CollectDataInfo failures 
(GetGroup, missing collector, dial failure -- prefixed &#34;top-level: &#34;) 
and per-node broadcast failures (prefixed &#34;future error: &#34;, &#34;node 
error: &#34;, &#34;broadcast failed: &#34;). Combined with len(data_info), 
consumers can tell the following four states apart: - data_info empty 
&amp;&amp; errors empty -&gt; no no [...]
 
 
 
diff --git a/fodc/proxy/internal/lifecycle/manager.go 
b/fodc/proxy/internal/lifecycle/manager.go
index 23c7bc9db..d17128b8c 100644
--- a/fodc/proxy/internal/lifecycle/manager.go
+++ b/fodc/proxy/internal/lifecycle/manager.go
@@ -20,6 +20,7 @@ package lifecycle
 
 import (
        "context"
+       "sort"
        "sync"
 
        fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
@@ -258,16 +259,41 @@ func (m *Manager) buildInspectionResult(allData 
[]*agentLifecycleData) *Inspecti
 
 func (m *Manager) mergeGroups(allData []*agentLifecycleData) 
[]*fodcv1.GroupLifecycleInfo {
        groupMap := make(map[string]*fodcv1.GroupLifecycleInfo)
+       // Errors are unioned across agents because each agent may observe a
+       // different subset of per-node failures (e.g. liaison-0 sees cold-0
+       // time out while liaison-1 sees cold-0 panic). Last-wins on the rest
+       // of the GroupLifecycleInfo is fine -- name/catalog/resource_opts are
+       // agent-invariant -- but errors must be the deduped union.
+       mergedErrors := make(map[string]map[string]struct{})
        for _, ad := range allData {
                if ad == nil || ad.Data == nil {
                        continue
                }
                for _, g := range ad.Data.Groups {
                        groupMap[g.Name] = g
+                       if len(g.Errors) == 0 {
+                               continue
+                       }
+                       set, ok := mergedErrors[g.Name]
+                       if !ok {
+                               set = make(map[string]struct{})
+                               mergedErrors[g.Name] = set
+                       }
+                       for _, e := range g.Errors {
+                               set[e] = struct{}{}
+                       }
                }
        }
        groups := make([]*fodcv1.GroupLifecycleInfo, 0, len(groupMap))
-       for _, g := range groupMap {
+       for name, g := range groupMap {
+               if set := mergedErrors[name]; len(set) > 0 {
+                       errs := make([]string, 0, len(set))
+                       for e := range set {
+                               errs = append(errs, e)
+                       }
+                       sort.Strings(errs)
+                       g.Errors = errs
+               }
                groups = append(groups, g)
        }
        return groups
diff --git a/fodc/proxy/internal/lifecycle/manager_test.go 
b/fodc/proxy/internal/lifecycle/manager_test.go
index 55a76f166..3f1117705 100644
--- a/fodc/proxy/internal/lifecycle/manager_test.go
+++ b/fodc/proxy/internal/lifecycle/manager_test.go
@@ -394,3 +394,64 @@ func TestManager_CollectLifecycle_ContextCanceled(t 
*testing.T) {
        result, _ := mgr.CollectLifecycle(ctx)
        require.NotNil(t, result)
 }
+
+// TestManager_MergeGroups_UnionsErrorsAcrossAgents verifies that when
+// multiple agents report the same group, the errors observed by each
+// agent are unioned (deduped, sorted) rather than overwritten last-wins,
+// so downstream consumers see every per-node failure regardless of which
+// agent's view they happened to land on.
+func TestManager_MergeGroups_UnionsErrorsAcrossAgents(t *testing.T) {
+       log := initTestLogger(t)
+       mgr := NewManager(nil, nil, log)
+
+       groupName := "sw_metric"
+       agentA := &agentLifecycleData{
+               PodName: "liaison-0",
+               Data: &fodcv1.LifecycleData{
+                       Groups: []*fodcv1.GroupLifecycleInfo{{
+                               Name:    groupName,
+                               Catalog: "CATALOG_MEASURE",
+                               Errors: []string{
+                                       "node error: cold-0 panic",
+                                       "future error: rpc deadline",
+                               },
+                       }},
+               },
+       }
+       agentB := &agentLifecycleData{
+               PodName: "liaison-1",
+               Data: &fodcv1.LifecycleData{
+                       Groups: []*fodcv1.GroupLifecycleInfo{{
+                               Name:    groupName,
+                               Catalog: "CATALOG_MEASURE",
+                               Errors: []string{
+                                       "node error: cold-0 panic", // 
duplicate of A's first
+                                       "broadcast failed: dial timeout",
+                               },
+                       }},
+               },
+       }
+       agentC := &agentLifecycleData{
+               PodName: "liaison-2",
+               Data: &fodcv1.LifecycleData{
+                       Groups: []*fodcv1.GroupLifecycleInfo{{
+                               Name:    groupName,
+                               Catalog: "CATALOG_MEASURE",
+                               // no errors
+                       }},
+               },
+       }
+
+       merged := mgr.mergeGroups([]*agentLifecycleData{agentA, agentB, agentC})
+       require.Len(t, merged, 1)
+       g := merged[0]
+       assert.Equal(t, groupName, g.Name)
+       assert.Equal(t,
+               []string{
+                       "broadcast failed: dial timeout",
+                       "future error: rpc deadline",
+                       "node error: cold-0 panic",
+               },
+               g.Errors,
+               "errors must be the deduped, sorted union of every agent's 
view")
+}

Reply via email to