This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 6ce81416252e9b73f3da7e0c4e896aee408acc6a Author: mrproliu <[email protected]> AuthorDate: Thu May 7 11:23:26 2026 +0800 Fix fetch segment nil pointer in cold stage and propagation the error when inspect the group (#1115) --- CHANGES.md | 2 + api/proto/banyandb/fodc/v1/rpc.proto | 11 + banyand/internal/storage/index.go | 49 +++- banyand/internal/storage/segment.go | 42 ++- banyand/internal/storage/segment_test.go | 318 +++++++++++++++++++++ banyand/liaison/grpc/deletion.go | 14 +- banyand/liaison/grpc/deletion_test.go | 28 +- banyand/liaison/grpc/registry.go | 9 +- banyand/measure/metadata_internal_test.go | 48 ++++ banyand/metadata/client.go | 2 +- banyand/metadata/metadata.go | 2 +- banyand/metadata/schema/collector.go | 46 ++- banyand/queue/sub/group_lifecycle.go | 14 +- .../queue/sub/group_lifecycle_cold_tier_test.go | 298 +++++++++++++++++++ banyand/queue/sub/group_lifecycle_test.go | 116 +++++++- banyand/stream/metadata_internal_test.go | 48 ++++ banyand/trace/metadata_internal_test.go | 48 ++++ docs/api-reference.md | 1 + fodc/proxy/internal/lifecycle/manager.go | 28 +- fodc/proxy/internal/lifecycle/manager_test.go | 61 ++++ 20 files changed, 1146 insertions(+), 39 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c4b2c5cb7..5cd522b15 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -32,6 +32,8 @@ Release Notes. - Fix FODC lifecycle cache poisoning where transient `InspectAll` failures were cached for 10 minutes and masked liaison recovery; raise FODC agent and proxy timeouts from 10s to 40s. - Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g. `replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to `protojson` so all fields are emitted (nil nested messages serialize as `null`). - Fix trace `block_writer` panic on out-of-order timestamps within the same traceID, which dropped one trace-write batch per panic in multi-agent SkyWalking deployments. Spans of a single trace originate from independently-clocked services, and trace storage is organized by traceID rather than timestamp, so per-traceID timestamp monotonicity is not a writer invariant. +- Fix nil-pointer panic on cold-tier data nodes when FODC `InspectAll` raced with idle-segment cleanup. +- Add `GroupLifecycleInfo.errors` to surface per-group collection failures from FODC `InspectAll` instead of silently dropping the affected node entry. ### Chores diff --git a/api/proto/banyandb/fodc/v1/rpc.proto b/api/proto/banyandb/fodc/v1/rpc.proto index dd7a5c67a..9315da8a6 100644 --- a/api/proto/banyandb/fodc/v1/rpc.proto +++ b/api/proto/banyandb/fodc/v1/rpc.proto @@ -114,6 +114,17 @@ message GroupLifecycleInfo { string catalog = 2; banyandb.common.v1.ResourceOpts resource_opts = 3; repeated banyandb.database.v1.DataInfo data_info = 4; + // errors lists every failure observed while collecting data_info for this + // group: top-level CollectDataInfo failures (GetGroup, missing collector, + // dial failure -- prefixed "top-level: ") and per-node broadcast failures + // (prefixed "future error: ", "node error: ", "broadcast failed: "). + // Combined with len(data_info), consumers can tell the following four + // states apart: + // - data_info empty && errors empty -> no nodes reported (group inactive) + // - data_info empty && errors non-empty -> total failure + // - data_info non-empty && errors empty -> full success + // - data_info non-empty && errors non-empty -> partial failure + repeated string errors = 5; } message LifecycleReport { diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index dff32d742..3ff0ae41c 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -37,14 +37,33 @@ import ( const seriesIndexDirName = "sidx" func (s *segment[T, O]) IndexDB() IndexDB { - return s.index + // Snapshot s.index into a local: the field can be flipped to nil by + // performCleanup at any time, so a second read could see a different + // value. Returning the local also avoids boxing a nil *seriesIndex + // into a typed-nil IndexDB interface, which would defeat the + // `if indexDB == nil` guard on the caller side. + idx := s.index + if idx == nil { + return nil + } + return idx } func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { - sl, err := s.index.filter(ctx, series, nil, nil, nil) + idx := s.index + if idx == nil { + return nil, ErrSegmentClosed + } + sl, err := idx.filter(ctx, series, nil, nil, nil) return sl.SeriesList, err } +// seriesIndex is the per-segment series index. Every method on this type +// starts with `if s == nil { ... }` purely to absorb the typed-nil leak +// that `segment.IndexDB()` could expose if a caller bypassed the producer +// guard there. The guards intentionally do NOT make the type fully +// nil-safe -- they assume that on a non-nil receiver, store / l / metrics +// / p are all populated by newSeriesIndex (the only constructor). type seriesIndex struct { store index.SeriesStore l *logger.Logger @@ -79,28 +98,45 @@ func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64, } func (s *seriesIndex) Insert(docs index.Documents) error { + if s == nil { + return ErrSegmentClosed + } return s.store.InsertSeriesBatch(index.Batch{ Documents: docs, }) } func (s *seriesIndex) Update(docs index.Documents) error { + if s == nil { + return ErrSegmentClosed + } return s.store.UpdateSeriesBatch(index.Batch{ Documents: docs, }) } func (s *seriesIndex) EnableExternalSegments() (index.ExternalSegmentStreamer, error) { + if s == nil { + return nil, ErrSegmentClosed + } return s.store.EnableExternalSegments() } +// Stats degrades to (0, 0) on a nil receiver as defense-in-depth in case +// a typed-nil interface leaks past segment.IndexDB(). func (s *seriesIndex) Stats() (dataCount int64, dataSizeBytes int64) { + if s == nil { + return 0, 0 + } return s.store.Stats() } func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series, projection []index.FieldKey, secondaryQuery index.Query, timeRange *timestamp.TimeRange, ) (data SeriesData, err error) { + if s == nil { + return SeriesData{}, ErrSegmentClosed + } if len(series) == 0 && secondaryQuery == nil && timeRange == nil { return data, nil } @@ -231,6 +267,9 @@ func convertIndexSeriesToSeriesList(indexSeries []index.SeriesDocument, hasField func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts IndexSearchOpts, ) (sd SeriesData, sortedValues [][]byte, err error) { + if s == nil { + return SeriesData{}, nil, ErrSegmentClosed + } tracer := query.GetTracer(ctx) if tracer != nil { var span *query.Span @@ -309,6 +348,9 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In } func (s *seriesIndex) SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd SeriesData, sortedValues [][]byte, err error) { + if s == nil { + return SeriesData{}, nil, ErrSegmentClosed + } tracer := query.GetTracer(ctx) if tracer != nil { var span *query.Span @@ -384,6 +426,9 @@ func (s *seriesIndex) SearchWithoutSeries(ctx context.Context, opts IndexSearchO } func (s *seriesIndex) Close() error { + if s == nil { + return nil + } s.metrics.DeleteAll(s.p.SegLabelValues()...) return s.store.Close() } diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 381a11478..12cc0fed9 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -157,13 +157,41 @@ func (s *segment[T, O]) TablesWithShardIDs() (tt []T, shardIDs []common.ShardID, return tt, shardIDs, cc } +// incRef acquires one reference on the segment, opening it via initialize +// if it is currently closed (refCount<=0). On return, the caller is +// guaranteed that the segment is alive (s.index != nil, shards loaded) and +// will stay alive until the matching DecRef. +// +// The CAS loop closes a TOCTOU race that the older "Load + AddInt32" +// shape suffered from: a goroutine could read refCount=1, race against a +// concurrent DecRef that drives it to 0 and triggers performCleanup +// (which sets s.index=nil and tears down shards), and then AddInt32 it +// back to 1. The caller would walk away with refCount=1 -- formally a +// valid reference -- but pointing at an already-cleaned-up segment, so +// every subsequent IndexDB() / Tables() call would observe zero state. +// +// The CAS variant retries whenever a concurrent DecRef beats us to the +// counter: if CAS fails, we re-Load and either succeed at the new value +// or fall through to initialize() to reopen the segment under s.mu. +// Symmetric to the CAS loop in DecRef, which guarantees the dual: a +// DecRef that drives refCount to 0 sees no concurrent +1 sneak in +// before performCleanup runs. func (s *segment[T, O]) incRef(ctx context.Context) error { s.lastAccessed.Store(time.Now().UnixNano()) - if atomic.LoadInt32(&s.refCount) <= 0 { - return s.initialize(ctx) + for { + current := atomic.LoadInt32(&s.refCount) + if current <= 0 { + // Either the segment was never opened or DecRef just drove + // refCount to 0; reopen under the mutex. + return s.initialize(ctx) + } + // CAS so a concurrent DecRef cannot flip refCount to 0 and run + // performCleanup between our Load and our increment. On failure + // we re-Load and re-evaluate the branch. + if atomic.CompareAndSwapInt32(&s.refCount, current, current+1) { + return nil + } } - atomic.AddInt32(&s.refCount, 1) - return nil } func (s *segment[T, O]) initialize(ctx context.Context) error { @@ -171,6 +199,12 @@ func (s *segment[T, O]) initialize(ctx context.Context) error { defer s.mu.Unlock() if atomic.LoadInt32(&s.refCount) > 0 { + // Another goroutine reopened the segment while we waited for the + // mutex. Add the +1 our caller (incRef) skipped before entering + // the slow path; otherwise concurrent reopens would share one + // refCount and the first DecRef would cleanup while we are still + // using it. + atomic.AddInt32(&s.refCount, 1) return nil } diff --git a/banyand/internal/storage/segment_test.go b/banyand/internal/storage/segment_test.go index 9a55dd358..f50e07603 100644 --- a/banyand/internal/storage/segment_test.go +++ b/banyand/internal/storage/segment_test.go @@ -24,6 +24,8 @@ import ( "os" "path/filepath" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -860,3 +862,319 @@ func TestOpenFallbackOldFormatMetadata(t *testing.T) { seg.DecRef() } } + +// TestSegment_IndexDB_ReturnsUntypedNilWhenClosed is a regression guard for +// the cold-tier nil-pointer panic. After a segment has been idle-closed +// (s.index == nil), segment.IndexDB() must return an untyped nil interface +// so that the standard caller pattern `if indexDB == nil { ... }` short +// circuits correctly. Returning the underlying *seriesIndex would box a +// typed-nil interface, defeat the nil check, and crash on the next method +// call. +func TestSegment_IndexDB_ReturnsUntypedNilWhenClosed(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-untyped-nil") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{Database: "test-db", Stage: "test-stage"} + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 1, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 7}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, tempDir, l, opts, nil, nil, time.Second, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, group, + ) + + now := time.Now().UTC() + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + endTime := startTime.Add(24 * time.Hour) + + segPath := filepath.Join(tempDir, "segment-"+startTime.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath, DirPerm)) + require.NoError(t, os.WriteFile( + filepath.Join(segPath, metadataFilename), + []byte(currentVersion), FilePerm)) + + seg, err := sc.openSegment(ctx, startTime, endTime, segPath, + startTime.Format(dayFormat), sc.groupCache) + require.NoError(t, err) + + // Idle-close: drop the only reference so performCleanup runs and the + // segment ends up in the residual state seen on cold-tier nodes + // (refCount=0, s.index=nil, but segment still reachable for reopen). + seg.DecRef() + require.Nil(t, seg.index) + require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount)) + + indexDB := seg.IndexDB() + require.True(t, indexDB == nil, + "IndexDB() must return untyped nil after idle close so the caller's "+ + "`if indexDB == nil` guard fires") + + // Drive the exact caller pattern from collectSeriesIndexInfo. It must + // short-circuit on the nil check and never dispatch Stats() on a nil + // receiver. + var dataCount, dataSize int64 + func() { + defer func() { + if r := recover(); r != nil { + t.Fatalf("caller pattern must not panic but got: %v", r) + } + }() + if indexDB != nil { + dataCount, dataSize = indexDB.Stats() + } + }() + assert.Equal(t, int64(0), dataCount) + assert.Equal(t, int64(0), dataSize) +} + +// TestSegment_ConcurrentReopen_RefCountConsistent stress-tests the +// reference-count contract of segment.incRef under concurrent reopen. +// Before the fix, segment.initialize() returned without AddInt32 when +// another goroutine had already reopened the segment, so the second caller +// "owned" a reference it had never accounted for. The first DecRef would +// then trigger performCleanup while another goroutine was still using the +// segment. +// +// After the fix, every concurrent incRef must add exactly one reference +// regardless of which path it takes. +// +// Reliability note: a single 64-goroutine round can occasionally serialize +// well enough that only the first caller enters the slow path +// (initialize) and the rest take the fast path (CAS+1). That serialized +// shape would also pass on the buggy code. To make the test deterministic +// across machines, the experiment is repeated for `rounds` cycles, each +// starting from the residual state. The bug becomes effectively +// impossible to miss across all rounds combined. +func TestSegment_ConcurrentReopen_RefCountConsistent(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-concurrent-reopen") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{Database: "test-db", Stage: "test-stage"} + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 2, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 7}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, tempDir, l, opts, nil, nil, time.Second, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, group, + ) + + now := time.Now().UTC() + startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + endTime := startTime.Add(24 * time.Hour) + + segPath := filepath.Join(tempDir, "segment-"+startTime.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath, DirPerm)) + require.NoError(t, os.WriteFile( + filepath.Join(segPath, metadataFilename), + []byte(currentVersion), FilePerm)) + + seg, err := sc.openSegment(ctx, startTime, endTime, segPath, + startTime.Format(dayFormat), sc.groupCache) + require.NoError(t, err) + + // Bring the segment into the cold-tier residual state. + seg.DecRef() + require.Nil(t, seg.index) + require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount)) + + const ( + N = 64 + rounds = 10 + ) + for r := 0; r < rounds; r++ { + require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), + "round %d: segment must start at refCount=0", r) + require.Nil(t, seg.index, "round %d: segment.index must start nil", r) + + var ( + wg sync.WaitGroup + start = make(chan struct{}) + errCnt int64 + ) + wg.Add(N) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + <-start + if incErr := seg.incRef(ctx); incErr != nil { + atomic.AddInt64(&errCnt, 1) + } + }() + } + close(start) + wg.Wait() + + require.Equal(t, int64(0), atomic.LoadInt64(&errCnt), + "round %d: incRef must not error", r) + require.Equal(t, int32(N), atomic.LoadInt32(&seg.refCount), + "round %d: every concurrent incRef must add exactly one reference", r) + require.NotNil(t, seg.index, "round %d: segment.index must be reopened", r) + + // Drain references; the last DecRef must trigger performCleanup + // and bring the segment back to the residual state for the next + // round. + for i := 0; i < N; i++ { + seg.DecRef() + } + require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), + "round %d", r) + require.Nil(t, seg.index, + "round %d: segment.index must be cleared by performCleanup", r) + } +} + +// TestSegment_ConcurrentReopenAndClose_NoPanic exercises the cold-tier +// reopen pattern under heavy concurrency: many goroutines repeatedly call +// selectSegments, exercise IndexDB().Stats() on each returned segment, and +// release with DecRef. With idle-closed segments as the starting state, +// each iteration triggers initialize() to reopen the segment and the final +// DecRef triggers performCleanup() to close it again, so closes and +// reopens are interleaved across goroutines. +// +// Verifies (run with -race): +// - no goroutine panics (covers both the typed-nil and refcount fixes) +// - no data race is detected on the segment lifecycle +// - all segments end up cleanly closed (refCount == 0, index == nil) +func TestSegment_ConcurrentReopenAndClose_NoPanic(t *testing.T) { + tempDir, cleanup := setupTestEnvironment(t) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-concurrent-reopen-close") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{Database: "test-db", Stage: "test-stage"} + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 2, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 7}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, tempDir, l, opts, nil, nil, time.Second, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, group, + ) + + now := time.Now().UTC() + day1 := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + day2 := day1.Add(24 * time.Hour) + day3 := day2.Add(24 * time.Hour) + days := []time.Time{day1, day2, day3} + + for _, d := range days { + segPath := filepath.Join(tempDir, "segment-"+d.Format(dayFormat)) + require.NoError(t, os.MkdirAll(segPath, DirPerm)) + require.NoError(t, os.WriteFile( + filepath.Join(segPath, metadataFilename), + []byte(currentVersion), FilePerm)) + seg, openErr := sc.openSegment(ctx, d, d.Add(24*time.Hour), + segPath, d.Format(dayFormat), sc.groupCache) + require.NoError(t, openErr) + sc.Lock() + sc.lst = append(sc.lst, seg) + sc.sortLst() + sc.Unlock() + // Drop the open reference so each segment starts in the residual state. + seg.DecRef() + require.Nil(t, seg.index) + } + + timeRange := timestamp.NewInclusiveTimeRange(day1, day3.Add(24*time.Hour)) + + const ( + goroutines = 8 + cycles = 200 + ) + var ( + wg sync.WaitGroup + start = make(chan struct{}) + panicCnt int64 + ) + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + atomic.AddInt64(&panicCnt, 1) + t.Errorf("goroutine-%d panicked: %v", id, r) + } + }() + <-start + for c := 0; c < cycles; c++ { + segs, selErr := sc.selectSegments(timeRange) + if selErr != nil { + t.Errorf("goroutine-%d cycle %d selectSegments: %v", id, c, selErr) + return + } + for _, s := range segs { + if idx := s.IndexDB(); idx != nil { + _, _ = idx.Stats() + } + s.DecRef() + } + } + }(i) + } + close(start) + wg.Wait() + + require.Equal(t, int64(0), atomic.LoadInt64(&panicCnt), + "no goroutine should panic during the concurrent reopen/close cycle") + + sc.RLock() + final := append([]*segment[mockTSTable, mockTSTableOpener]{}, sc.lst...) + sc.RUnlock() + for _, s := range final { + require.Equal(t, int32(0), atomic.LoadInt32(&s.refCount), + "segment %s leaked references", s.suffix) + require.Nil(t, s.index, "segment %s should be cleaned up", s.suffix) + } +} diff --git a/banyand/liaison/grpc/deletion.go b/banyand/liaison/grpc/deletion.go index 60edfd033..5bc457086 100644 --- a/banyand/liaison/grpc/deletion.go +++ b/banyand/liaison/grpc/deletion.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "time" @@ -158,11 +159,16 @@ func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group stri return fmt.Errorf("deletion task for group %s is already in progress", group) } - dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group) + dataInfo, collectionErrs, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group) if dataErr != nil { m.tasks.Delete(group) return fmt.Errorf("failed to collect data info for group %s: %w", group, dataErr) } + if len(collectionErrs) > 0 { + m.tasks.Delete(group) + return fmt.Errorf("incomplete data info for group %s, refusing to start deletion: %s", + group, strings.Join(collectionErrs, "; ")) + } var totalDataSize int64 for _, di := range dataInfo { totalDataSize += di.GetDataSizeBytes() @@ -452,10 +458,14 @@ func (m *groupDeletionTaskManager) getDeletionTask(ctx context.Context, group st } func (m *groupDeletionTaskManager) hasNonEmptyResources(ctx context.Context, group string) (bool, error) { - dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group) + dataInfo, collectionErrs, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group) if dataErr != nil { return false, fmt.Errorf("failed to collect data info: %w", dataErr) } + if len(collectionErrs) > 0 { + return false, fmt.Errorf("incomplete data info for group %s, cannot determine emptiness: %s", + group, strings.Join(collectionErrs, "; ")) + } for _, di := range dataInfo { if di.GetDataSizeBytes() > 0 { return true, nil diff --git a/banyand/liaison/grpc/deletion_test.go b/banyand/liaison/grpc/deletion_test.go index 88b810a7c..955a23abd 100644 --- a/banyand/liaison/grpc/deletion_test.go +++ b/banyand/liaison/grpc/deletion_test.go @@ -121,7 +121,7 @@ func TestHasNonEmptyResources(t *testing.T) { defer ctrl.Finish() mockRepo := metadata.NewMockRepo(ctrl) - mockRepo.EXPECT().CollectDataInfo(gomock.Any(), "test-group").Return(tt.infos, nil) + mockRepo.EXPECT().CollectDataInfo(gomock.Any(), "test-group").Return(tt.infos, nil, nil) m := &groupDeletionTaskManager{schemaRegistry: mockRepo} hasResources, checkErr := m.hasNonEmptyResources(context.Background(), "test-group") @@ -129,6 +129,30 @@ func TestHasNonEmptyResources(t *testing.T) { assert.Equal(t, tt.expected, hasResources) }) } + + // Partial-failure path: when CollectDataInfo reports per-node errors, + // hasNonEmptyResources must refuse to answer (returning an error) + // rather than silently treating the half-populated DataInfo as + // authoritative. A wrong "false" here would let the caller delete + // data on the silent node. + t.Run("partial failure refuses to answer", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockRepo := metadata.NewMockRepo(ctrl) + mockRepo.EXPECT().CollectDataInfo(gomock.Any(), "test-group").Return( + []*databasev1.DataInfo{{DataSizeBytes: 0}}, // looks empty + []string{"node error: cold-0 panic"}, // but cold-0 did not report + nil, + ) + + m := &groupDeletionTaskManager{schemaRegistry: mockRepo} + hasResources, checkErr := m.hasNonEmptyResources(context.Background(), "test-group") + require.Error(t, checkErr, "partial collection failure must surface as an error, not (false, nil)") + assert.Contains(t, checkErr.Error(), "incomplete data info") + assert.Contains(t, checkErr.Error(), "cold-0 panic") + assert.False(t, hasResources, "the bool return is irrelevant when the error is non-nil; caller must check err first") + }) } func TestDeletion(t *testing.T) { @@ -192,7 +216,7 @@ func TestDeletion(t *testing.T) { } mockRepo := metadata.NewMockRepo(ctrl) - mockRepo.EXPECT().CollectDataInfo(gomock.Any(), group).Return([]*databasev1.DataInfo{{DataSizeBytes: 512}}, nil) + mockRepo.EXPECT().CollectDataInfo(gomock.Any(), group).Return([]*databasev1.DataInfo{{DataSizeBytes: 512}}, nil, nil) mockRepo.EXPECT().IndexRuleBindingRegistry().Return(&stubIndexRuleBinding{}) mockRepo.EXPECT().IndexRuleRegistry().Return(&stubIndexRule{}) diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go index 1e9af0c15..33977c4bc 100644 --- a/banyand/liaison/grpc/registry.go +++ b/banyand/liaison/grpc/registry.go @@ -20,6 +20,8 @@ package grpc import ( "context" "errors" + "fmt" + "strings" "time" "google.golang.org/grpc/codes" @@ -754,11 +756,16 @@ func (rs *groupRegistryServer) Inspect(ctx context.Context, req *databasev1.Grou rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect") return nil, schemaErr } - dataInfo, dataErr := rs.schemaRegistry.CollectDataInfo(ctx, g) + dataInfo, collectionErrs, dataErr := rs.schemaRegistry.CollectDataInfo(ctx, g) if dataErr != nil { rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect") return nil, dataErr } + if len(collectionErrs) > 0 { + rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect") + return nil, fmt.Errorf("incomplete data info for group %s: %s", + g, strings.Join(collectionErrs, "; ")) + } liaisonInfo, liaisonErr := rs.schemaRegistry.CollectLiaisonInfo(ctx, g) if liaisonErr != nil { rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect") diff --git a/banyand/measure/metadata_internal_test.go b/banyand/measure/metadata_internal_test.go new file mode 100644 index 000000000..87fa5db3a --- /dev/null +++ b/banyand/measure/metadata_internal_test.go @@ -0,0 +1,48 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/banyand/internal/storage" +) + +// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped +// nil. The embedded interface lets us satisfy the Segment[*tsTable, option] +// signature without implementing every method (only IndexDB is exercised +// by collectSeriesIndexInfo). +type nilIndexDBSegment struct { + storage.Segment[*tsTable, option] +} + +func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil } + +// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract +// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic) +// when the segment is in the cold-tier residual state where its underlying +// series index has been torn down by performCleanup. +func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) { + sr := &schemaRepo{} + info := sr.collectSeriesIndexInfo(nilIndexDBSegment{}) + require.NotNil(t, info) + require.Equal(t, int64(0), info.DataCount) + require.Equal(t, int64(0), info.DataSizeBytes) +} diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index 9ae0dd359..88b43bad7 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -665,7 +665,7 @@ func (s *clientService) Subjects(ctx context.Context, indexRule *databasev1.Inde return foundSubjects, subjectErr } -func (s *clientService) CollectDataInfo(ctx context.Context, group string) ([]*databasev1.DataInfo, error) { +func (s *clientService) CollectDataInfo(ctx context.Context, group string) ([]*databasev1.DataInfo, []string, error) { return s.infoCollectorRegistry.CollectDataInfo(ctx, group) } diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go index ac5b6508c..35a6644d8 100644 --- a/banyand/metadata/metadata.go +++ b/banyand/metadata/metadata.go @@ -53,7 +53,7 @@ type Repo interface { RegisterHandler(string, schema.Kind, schema.EventHandler) NodeRegistry() schema.Node PropertyRegistry() schema.Property - CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, error) + CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, []string, error) CollectLiaisonInfo(context.Context, string) ([]*databasev1.LiaisonInfo, error) DropGroup(ctx context.Context, catalog commonv1.Catalog, group string) error } diff --git a/banyand/metadata/schema/collector.go b/banyand/metadata/schema/collector.go index 9b297fd66..6ece113de 100644 --- a/banyand/metadata/schema/collector.go +++ b/banyand/metadata/schema/collector.go @@ -39,6 +39,20 @@ import ( // merge/flush load can take several seconds) while still bounding the call. const inspectBroadcastTimeout = 30 * time.Second +// Prefixes used by every producer that writes into +// GroupLifecycleInfo.errors so downstream consumers can categorize +// failures via a stable prefix match. The full vocabulary lives here: +// BroadcastErrorPrefix / FutureErrorPrefix / NodeErrorPrefix are written +// by broadcastCollectDataInfo below; TopLevelErrorPrefix is written by +// the InspectAll RPC handler in banyand/queue/sub/group_lifecycle.go, +// which observes the top-level error returned by CollectDataInfo. +const ( + BroadcastErrorPrefix = "broadcast failed: " + FutureErrorPrefix = "future error: " + NodeErrorPrefix = "node error: " + TopLevelErrorPrefix = "top-level: " +) + // GroupGetter provides method to get group metadata. type GroupGetter interface { GetGroup(ctx context.Context, group string) (*commonv1.Group, error) @@ -72,22 +86,29 @@ func NewInfoCollectorRegistry(l *logger.Logger, groupGetter GroupGetter) *InfoCo } } -// CollectDataInfo collects data information from both local and remote data nodes. -func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group string) ([]*databasev1.DataInfo, error) { +// CollectDataInfo collects data information from both local and remote data +// nodes. Returns the aggregated DataInfo slice plus a separate slice of +// per-node collection errors observed during broadcast (broadcast failure, +// future errors, listener-side common.Error responses); the two slices +// are independent -- the errors slice only contains failures and is not +// positionally aligned with DataInfo. The error return is reserved for +// top-level failures that prevent any aggregation (GetGroup failure, +// local collector failure, unsupported catalog). +func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group string) ([]*databasev1.DataInfo, []string, error) { g, getErr := icr.groupGetter.GetGroup(ctx, group) if getErr != nil { - return nil, getErr + return nil, nil, getErr } localInfo, localErr := icr.collectDataInfoLocal(ctx, g.Catalog, group) if localErr != nil { - return nil, localErr + return nil, nil, localErr } localInfoList := []*databasev1.DataInfo{} if localInfo != nil { localInfoList = []*databasev1.DataInfo{localInfo} } if icr.dataBroadcaster == nil { - return localInfoList, nil + return localInfoList, nil, nil } var topic bus.Topic @@ -99,25 +120,27 @@ func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group str case commonv1.Catalog_CATALOG_TRACE: topic = data.TopicTraceCollectDataInfo default: - return nil, fmt.Errorf("unsupported catalog type: %v", g.Catalog) + return nil, nil, fmt.Errorf("unsupported catalog type: %v", g.Catalog) } - remoteInfo := icr.broadcastCollectDataInfo(topic, group) - return append(localInfoList, remoteInfo...), nil + remoteInfo, collectionErrors := icr.broadcastCollectDataInfo(topic, group) + return append(localInfoList, remoteInfo...), collectionErrors, nil } -func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic, group string) []*databasev1.DataInfo { +func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic, group string) ([]*databasev1.DataInfo, []string) { message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &databasev1.GroupRegistryServiceInspectRequest{Group: group}) futures, broadcastErr := icr.dataBroadcaster.Broadcast(inspectBroadcastTimeout, topic, message) if broadcastErr != nil { icr.l.Warn().Err(broadcastErr).Str("group", group).Msg("failed to broadcast collect data info request") - return []*databasev1.DataInfo{} + return []*databasev1.DataInfo{}, []string{BroadcastErrorPrefix + broadcastErr.Error()} } dataInfoList := make([]*databasev1.DataInfo, 0, len(futures)) + var collectionErrors []string for _, future := range futures { msg, getErr := future.Get() if getErr != nil { icr.l.Warn().Err(getErr).Str("group", group).Msg("failed to get collect data info response") + collectionErrors = append(collectionErrors, FutureErrorPrefix+getErr.Error()) continue } msgData := msg.Data() @@ -128,9 +151,10 @@ func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic, grou } case *common.Error: icr.l.Warn().Str("error", d.Error()).Str("group", group).Msg("error collecting data info from node") + collectionErrors = append(collectionErrors, NodeErrorPrefix+d.Error()) } } - return dataInfoList + return dataInfoList, collectionErrors } func (icr *InfoCollectorRegistry) collectDataInfoLocal(ctx context.Context, catalog commonv1.Catalog, group string) (*databasev1.DataInfo, error) { diff --git a/banyand/queue/sub/group_lifecycle.go b/banyand/queue/sub/group_lifecycle.go index 8104015da..d8d1f495b 100644 --- a/banyand/queue/sub/group_lifecycle.go +++ b/banyand/queue/sub/group_lifecycle.go @@ -24,6 +24,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) // maxInspectGroupConcurrency caps how many groups InspectAll will inspect @@ -77,12 +78,19 @@ func (s *server) inspectGroup(ctx context.Context, group *commonv1.Group) *fodcv Catalog: catalogToString(group.Catalog), ResourceOpts: group.ResourceOpts, } - dataInfo, err := s.metadataRepo.CollectDataInfo(ctx, groupName) + // CollectDataInfo's contract: a non-nil err means no DataInfo and nil + // collectionErrs (per banyand/metadata/schema/collector.go); a nil err + // means DataInfo is populated and collectionErrs may carry per-node + // broadcast failures. The two paths never overlap, so the top-level + // branch sets a fresh single-element Errors slice. + dataInfo, collectionErrs, err := s.metadataRepo.CollectDataInfo(ctx, groupName) if err != nil { s.log.Warn().Err(err).Str("group", groupName).Msg("Failed to collect data info") - } else { - info.DataInfo = dataInfo + info.Errors = []string{schema.TopLevelErrorPrefix + err.Error()} + return info } + info.DataInfo = dataInfo + info.Errors = collectionErrs return info } diff --git a/banyand/queue/sub/group_lifecycle_cold_tier_test.go b/banyand/queue/sub/group_lifecycle_cold_tier_test.go new file mode 100644 index 000000000..46beacd4c --- /dev/null +++ b/banyand/queue/sub/group_lifecycle_cold_tier_test.go @@ -0,0 +1,298 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sub + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// coldTierRepo is a metadata.Repo stub modeled on cold-tier data nodes. +// CollectDataInfo simulates the post-fix output of schemaRepo.collectSeriesIndexInfo +// for idle-closed segments: returns DataInfo whose SegmentInfo entries each carry +// an empty SeriesIndexInfo (DataCount=0, DataSizeBytes=0). The hook lets tests +// inject failures, latency, or per-call mutation to mimic the cold-tier +// "InspectAll while closeIdleSegments is firing" race window. +type coldTierRepo struct { + metadata.Repo + groupRegistry *mockGroupRegistry + hook func(group string, callIdx int) ([]*databasev1.DataInfo, []string, error) + callsByGroup sync.Map // map[string]*atomic.Int64 + totalCalls atomic.Int64 + concurrentNow atomic.Int32 + concurrentMax atomic.Int32 + panicCount atomic.Int32 +} + +func (r *coldTierRepo) GroupRegistry() schema.Group { + return r.groupRegistry +} + +func (r *coldTierRepo) CollectDataInfo(_ context.Context, group string) (out []*databasev1.DataInfo, collectionErrs []string, err error) { + defer func() { + if rec := recover(); rec != nil { + r.panicCount.Add(1) + err = fmt.Errorf("panic in CollectDataInfo: %v", rec) + } + }() + r.totalCalls.Add(1) + current := r.concurrentNow.Add(1) + defer r.concurrentNow.Add(-1) + for { + hi := r.concurrentMax.Load() + if current <= hi || r.concurrentMax.CompareAndSwap(hi, current) { + break + } + } + cntAny, _ := r.callsByGroup.LoadOrStore(group, new(atomic.Int64)) + idx := cntAny.(*atomic.Int64).Add(1) + if r.hook != nil { + return r.hook(group, int(idx-1)) + } + return coldTierEmptyDataInfo(), nil, nil +} + +// coldTierEmptyDataInfo mirrors the DataInfo shape the fixed schemaRepo +// returns when every selected segment is in the idle-closed residual state +// (s.index = nil): non-nil top-level DataInfo with one SegmentInfo entry whose +// SeriesIndexInfo is the zero value. +func coldTierEmptyDataInfo() []*databasev1.DataInfo { + return []*databasev1.DataInfo{ + { + SegmentInfo: []*databasev1.SegmentInfo{ + { + SegmentId: "seg-cold-tier-idle", + SeriesIndexInfo: &databasev1.SeriesIndexInfo{}, + ShardInfo: []*databasev1.ShardInfo{}, + }, + }, + DataSizeBytes: 0, + }, + } +} + +func newColdTierGroups(catalogs ...commonv1.Catalog) []*commonv1.Group { + groups := make([]*commonv1.Group, 0, len(catalogs)) + for i, cat := range catalogs { + groups = append(groups, &commonv1.Group{ + Metadata: &commonv1.Metadata{Name: fmt.Sprintf("cold_group_%d", i)}, + Catalog: cat, + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 2, + SegmentInterval: &commonv1.IntervalRule{ + Unit: commonv1.IntervalRule_UNIT_DAY, Num: 1, + }, + Ttl: &commonv1.IntervalRule{ + Unit: commonv1.IntervalRule_UNIT_DAY, Num: 30, + }, + }, + }) + } + return groups +} + +// TestInspectAll_ColdTier_Integration simulates the cold-tier traffic +// pattern that triggered the production nil-pointer panic on +// demo-banyandb-data-cold-0. FODC InspectAll fan-out plus +// closeIdleSegments leave segments in the "refCount=0, index=nil" +// residual state. With the typed-nil and refcount fixes applied, +// schemaRepo.collectSeriesIndexInfo short-circuits and returns an empty +// SeriesIndexInfo rather than panicking. +// +// This test wires the real InspectAll RPC handler against a stub +// metadata.Repo that mirrors the post-fix output of the storage path, +// then drives four cold-tier scenarios end-to-end: +// +// A. single InspectAll after idle-close +// B. repeated InspectAll cycles (cold-tier reinspect) +// C. concurrent InspectAll calls +// D. concurrent InspectAll with simulated idle-close churn between calls +func TestInspectAll_ColdTier_Integration(t *testing.T) { + groups := newColdTierGroups( + commonv1.Catalog_CATALOG_MEASURE, + commonv1.Catalog_CATALOG_STREAM, + commonv1.Catalog_CATALOG_TRACE, + ) + + t.Run("A_SingleInspectAfterIdleClose", func(t *testing.T) { + repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups: groups}} + s := &server{log: logger.GetLogger("cold-tier-A"), metadataRepo: repo} + + resp, err := s.InspectAll(context.Background(), &fodcv1.InspectAllRequest{}) + require.NoError(t, err) + require.NotNil(t, resp) + require.Len(t, resp.Groups, len(groups)) + + for _, g := range resp.Groups { + require.NotEmpty(t, g.DataInfo, "group %s should expose post-fix DataInfo", g.Name) + for _, di := range g.DataInfo { + require.NotNil(t, di) + for _, seg := range di.SegmentInfo { + require.NotNil(t, seg.SeriesIndexInfo, + "every segment must carry a non-nil (possibly empty) SeriesIndexInfo") + assert.Equal(t, int64(0), seg.SeriesIndexInfo.DataCount) + assert.Equal(t, int64(0), seg.SeriesIndexInfo.DataSizeBytes) + } + } + } + assert.Equal(t, int32(0), repo.panicCount.Load(), "no goroutine should panic") + }) + + t.Run("B_RepeatedInspectAll_ColdTierReinspect", func(t *testing.T) { + repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups: groups}} + s := &server{log: logger.GetLogger("cold-tier-B"), metadataRepo: repo} + + const cycles = 25 + for i := 0; i < cycles; i++ { + resp, err := s.InspectAll(context.Background(), &fodcv1.InspectAllRequest{}) + require.NoError(t, err, "cycle %d", i) + require.Len(t, resp.Groups, len(groups), "cycle %d returned wrong group count", i) + } + + // Every group must have been visited exactly `cycles` times -- no state + // leaks across repeated InspectAll calls (this is the "再次获取" check). + for _, g := range groups { + cntAny, ok := repo.callsByGroup.Load(g.Metadata.Name) + require.True(t, ok, "group %s missing call counter", g.Metadata.Name) + assert.Equal(t, int64(cycles), cntAny.(*atomic.Int64).Load(), + "group %s expected exactly %d calls", g.Metadata.Name, cycles) + } + assert.Equal(t, int64(cycles*len(groups)), repo.totalCalls.Load()) + assert.Equal(t, int32(0), repo.panicCount.Load()) + }) + + t.Run("C_ConcurrentInspectAll", func(t *testing.T) { + repo := &coldTierRepo{groupRegistry: &mockGroupRegistry{groups: groups}} + s := &server{log: logger.GetLogger("cold-tier-C"), metadataRepo: repo} + + const callers = 8 + var ( + wg sync.WaitGroup + start = make(chan struct{}) + panicCnt int64 + ) + for i := 0; i < callers; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + atomic.AddInt64(&panicCnt, 1) + t.Errorf("caller-%d panicked: %v", id, r) + } + }() + <-start + resp, err := s.InspectAll(context.Background(), &fodcv1.InspectAllRequest{}) + if !assert.NoError(t, err, "caller-%d", id) { + return + } + if !assert.NotNil(t, resp, "caller-%d", id) { + return + } + assert.Len(t, resp.Groups, len(groups), "caller-%d", id) + }(i) + } + close(start) + wg.Wait() + + assert.Equal(t, int64(0), panicCnt, "no caller may panic") + assert.Equal(t, int32(0), repo.panicCount.Load()) + assert.Equal(t, int64(callers*len(groups)), repo.totalCalls.Load()) + // Per-group fan-out cap is 32, so at least two groups should have + // overlapped within at least one InspectAll invocation. + assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2), + "concurrent InspectAll should overlap at least two CollectDataInfo calls") + }) + + t.Run("D_ConcurrentInspectAll_WithIdleCloseChurn", func(t *testing.T) { + // hook simulates the cold-tier race window: every other call returns + // a "fresh idle close just happened" payload (still empty), and a few + // calls return a benign error to mimic transient disk pressure. + brittleHook := func(group string, callIdx int) ([]*databasev1.DataInfo, []string, error) { + if callIdx%17 == 0 && callIdx > 0 { + return nil, nil, fmt.Errorf("simulated transient close error on %s call#%d", group, callIdx) + } + // Yield to amplify scheduling jitter, then return empty info. + time.Sleep(50 * time.Microsecond) + return coldTierEmptyDataInfo(), nil, nil + } + repo := &coldTierRepo{ + groupRegistry: &mockGroupRegistry{groups: groups}, + hook: brittleHook, + } + s := &server{log: logger.GetLogger("cold-tier-D"), metadataRepo: repo} + + const callers = 6 + const cycles = 20 + var ( + wg sync.WaitGroup + start = make(chan struct{}) + panicCnt int64 + ) + for i := 0; i < callers; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + atomic.AddInt64(&panicCnt, 1) + t.Errorf("caller-%d panicked: %v", id, r) + } + }() + <-start + for c := 0; c < cycles; c++ { + resp, err := s.InspectAll(context.Background(), &fodcv1.InspectAllRequest{}) + // require.* would FailNow inside this goroutine, which + // is forbidden by the testing package; use assert and + // bail out early on the first failure. + if !assert.NoError(t, err, "caller-%d cycle-%d", id, c) { + return + } + if !assert.NotNil(t, resp, "caller-%d cycle-%d", id, c) { + return + } + if !assert.Len(t, resp.Groups, len(groups), "caller-%d cycle-%d", id, c) { + return + } + } + }(i) + } + close(start) + wg.Wait() + + require.Equal(t, int64(0), panicCnt, "no caller may panic") + require.Equal(t, int32(0), repo.panicCount.Load(), + "CollectDataInfo must not panic under the cold-tier churn") + assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2), + "concurrent InspectAll under churn should overlap at least two CollectDataInfo calls") + }) +} diff --git a/banyand/queue/sub/group_lifecycle_test.go b/banyand/queue/sub/group_lifecycle_test.go index f3a5cd6ff..4f912b323 100644 --- a/banyand/queue/sub/group_lifecycle_test.go +++ b/banyand/queue/sub/group_lifecycle_test.go @@ -47,20 +47,21 @@ func (m *mockGroupRegistry) ListGroup(_ context.Context) ([]*commonv1.Group, err type mockMetadataRepo struct { metadata.Repo - groupRegistry *mockGroupRegistry - dataInfoMap map[string][]*databasev1.DataInfo - dataInfoErr map[string]error - slowGroups map[string]time.Duration - collectStarts atomic.Int32 - concurrentMax atomic.Int32 - concurrentNow atomic.Int32 + groupRegistry *mockGroupRegistry + dataInfoMap map[string][]*databasev1.DataInfo + dataInfoErr map[string]error + collectionErrors map[string][]string + slowGroups map[string]time.Duration + collectStarts atomic.Int32 + concurrentMax atomic.Int32 + concurrentNow atomic.Int32 } func (m *mockMetadataRepo) GroupRegistry() schema.Group { return m.groupRegistry } -func (m *mockMetadataRepo) CollectDataInfo(_ context.Context, group string) ([]*databasev1.DataInfo, error) { +func (m *mockMetadataRepo) CollectDataInfo(_ context.Context, group string) ([]*databasev1.DataInfo, []string, error) { m.collectStarts.Add(1) current := m.concurrentNow.Add(1) defer m.concurrentNow.Add(-1) @@ -77,13 +78,18 @@ func (m *mockMetadataRepo) CollectDataInfo(_ context.Context, group string) ([]* } if m.dataInfoErr != nil { if err, ok := m.dataInfoErr[group]; ok { - return nil, err + return nil, nil, err } } + var dataInfo []*databasev1.DataInfo if m.dataInfoMap != nil { - return m.dataInfoMap[group], nil + dataInfo = m.dataInfoMap[group] } - return nil, nil + var collectionErrs []string + if m.collectionErrors != nil { + collectionErrs = m.collectionErrors[group] + } + return dataInfo, collectionErrs, nil } func TestCatalogToString(t *testing.T) { @@ -264,4 +270,92 @@ func TestInspectAll_RunsGroupsInParallel(t *testing.T) { assert.GreaterOrEqual(t, repo.concurrentMax.Load(), int32(2), "at least two CollectDataInfo invocations must overlap; observed max concurrency was %d", repo.concurrentMax.Load()) + for _, g := range resp.Groups { + assert.Empty(t, g.Errors, "successful group %s must not carry errors", g.Name) + } +} + +// TestInspectAll_SurfacesTopLevelError asserts that a top-level +// CollectDataInfo error is reported through GroupLifecycleInfo.errors with +// a "top-level:" prefix and that DataInfo stays empty for that group. +func TestInspectAll_SurfacesTopLevelError(t *testing.T) { + repo := &mockMetadataRepo{ + groupRegistry: &mockGroupRegistry{ + groups: []*commonv1.Group{ + {Metadata: &commonv1.Metadata{Name: "ok_group"}, Catalog: commonv1.Catalog_CATALOG_MEASURE}, + {Metadata: &commonv1.Metadata{Name: "bad_group"}, Catalog: commonv1.Catalog_CATALOG_STREAM}, + }, + }, + dataInfoMap: map[string][]*databasev1.DataInfo{ + "ok_group": {{DataSizeBytes: 1024}}, + }, + dataInfoErr: map[string]error{ + "bad_group": fmt.Errorf("metadataRepo: simulated load failure"), + }, + } + s := &server{log: logger.GetLogger("test"), metadataRepo: repo} + + resp, err := s.InspectAll(context.Background(), &fodcv1.InspectAllRequest{}) + require.NoError(t, err) + require.Len(t, resp.Groups, 2) + + byName := map[string]*fodcv1.GroupLifecycleInfo{} + for _, g := range resp.Groups { + byName[g.Name] = g + } + + ok := byName["ok_group"] + require.NotNil(t, ok) + assert.Empty(t, ok.Errors, "ok_group must not carry errors") + assert.NotEmpty(t, ok.DataInfo) + + bad := byName["bad_group"] + require.NotNil(t, bad) + assert.Empty(t, bad.DataInfo, "failed group must not expose stale DataInfo") + require.NotEmpty(t, bad.Errors, "failed group must surface the CollectDataInfo error") + assert.Contains(t, bad.Errors[0], "top-level:", + "top-level failures must be tagged with the top-level prefix") + assert.Contains(t, bad.Errors[0], "simulated load failure", + "the original error message must be preserved") +} + +// TestInspectAll_PropagatesPartialFailureErrors asserts that per-node +// failures reported by mockMetadataRepo.collectionErrors are passed +// through to GroupLifecycleInfo.errors for the originating group, while +// DataInfo still carries the entries from nodes that succeeded. +func TestInspectAll_PropagatesPartialFailureErrors(t *testing.T) { + repo := &mockMetadataRepo{ + groupRegistry: &mockGroupRegistry{ + groups: []*commonv1.Group{ + {Metadata: &commonv1.Metadata{Name: "partial_group"}, Catalog: commonv1.Catalog_CATALOG_MEASURE}, + }, + }, + dataInfoMap: map[string][]*databasev1.DataInfo{ + "partial_group": {{DataSizeBytes: 512}}, + }, + collectionErrors: map[string][]string{ + "partial_group": { + "future error: rpc error: nil pointer dereference", + "node error: cold-0 panic", + }, + }, + } + s := &server{log: logger.GetLogger("test"), metadataRepo: repo} + + resp, err := s.InspectAll(context.Background(), &fodcv1.InspectAllRequest{}) + require.NoError(t, err) + require.Len(t, resp.Groups, 1) + g := resp.Groups[0] + assert.NotEmpty(t, g.DataInfo, "partial success must still expose the DataInfo entries it did get") + assert.Equal(t, + []string{ + "future error: rpc error: nil pointer dereference", + "node error: cold-0 panic", + }, + g.Errors, + "per-node collection errors must be passed through to GroupLifecycleInfo.errors") + for _, e := range g.Errors { + assert.NotContains(t, e, "top-level:", + "partial-success errors must not carry the top-level: prefix") + } } diff --git a/banyand/stream/metadata_internal_test.go b/banyand/stream/metadata_internal_test.go new file mode 100644 index 000000000..a292b6704 --- /dev/null +++ b/banyand/stream/metadata_internal_test.go @@ -0,0 +1,48 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/banyand/internal/storage" +) + +// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped +// nil. The embedded interface lets us satisfy the Segment[*tsTable, option] +// signature without implementing every method (only IndexDB is exercised +// by collectSeriesIndexInfo). +type nilIndexDBSegment struct { + storage.Segment[*tsTable, option] +} + +func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil } + +// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract +// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic) +// when the segment is in the cold-tier residual state where its underlying +// series index has been torn down by performCleanup. +func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) { + sr := &schemaRepo{} + info := sr.collectSeriesIndexInfo(nilIndexDBSegment{}) + require.NotNil(t, info) + require.Equal(t, int64(0), info.DataCount) + require.Equal(t, int64(0), info.DataSizeBytes) +} diff --git a/banyand/trace/metadata_internal_test.go b/banyand/trace/metadata_internal_test.go new file mode 100644 index 000000000..378078907 --- /dev/null +++ b/banyand/trace/metadata_internal_test.go @@ -0,0 +1,48 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package trace + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/banyand/internal/storage" +) + +// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped +// nil. The embedded interface lets us satisfy the Segment[*tsTable, option] +// signature without implementing every method (only IndexDB is exercised +// by collectSeriesIndexInfo). +type nilIndexDBSegment struct { + storage.Segment[*tsTable, option] +} + +func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil } + +// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract +// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic) +// when the segment is in the cold-tier residual state where its underlying +// series index has been torn down by performCleanup. +func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) { + sr := &schemaRepo{} + info := sr.collectSeriesIndexInfo(nilIndexDBSegment{}) + require.NotNil(t, info) + require.Equal(t, int64(0), info.DataCount) + require.Equal(t, int64(0), info.DataSizeBytes) +} diff --git a/docs/api-reference.md b/docs/api-reference.md index 4a5bc34e9..2c12c584f 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -4773,6 +4773,7 @@ Phase represents the current phase of the deletion task. | catalog | [string](#string) | | | | resource_opts | [banyandb.common.v1.ResourceOpts](#banyandb-common-v1-ResourceOpts) | | | | data_info | [banyandb.database.v1.DataInfo](#banyandb-database-v1-DataInfo) | repeated | | +| errors | [string](#string) | repeated | errors lists every failure observed while collecting data_info for this group: top-level CollectDataInfo failures (GetGroup, missing collector, dial failure -- prefixed "top-level: ") and per-node broadcast failures (prefixed "future error: ", "node error: ", "broadcast failed: "). Combined with len(data_info), consumers can tell the following four states apart: - data_info empty && errors empty -> no no [...] diff --git a/fodc/proxy/internal/lifecycle/manager.go b/fodc/proxy/internal/lifecycle/manager.go index 23c7bc9db..d17128b8c 100644 --- a/fodc/proxy/internal/lifecycle/manager.go +++ b/fodc/proxy/internal/lifecycle/manager.go @@ -20,6 +20,7 @@ package lifecycle import ( "context" + "sort" "sync" fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" @@ -258,16 +259,41 @@ func (m *Manager) buildInspectionResult(allData []*agentLifecycleData) *Inspecti func (m *Manager) mergeGroups(allData []*agentLifecycleData) []*fodcv1.GroupLifecycleInfo { groupMap := make(map[string]*fodcv1.GroupLifecycleInfo) + // Errors are unioned across agents because each agent may observe a + // different subset of per-node failures (e.g. liaison-0 sees cold-0 + // time out while liaison-1 sees cold-0 panic). Last-wins on the rest + // of the GroupLifecycleInfo is fine -- name/catalog/resource_opts are + // agent-invariant -- but errors must be the deduped union. + mergedErrors := make(map[string]map[string]struct{}) for _, ad := range allData { if ad == nil || ad.Data == nil { continue } for _, g := range ad.Data.Groups { groupMap[g.Name] = g + if len(g.Errors) == 0 { + continue + } + set, ok := mergedErrors[g.Name] + if !ok { + set = make(map[string]struct{}) + mergedErrors[g.Name] = set + } + for _, e := range g.Errors { + set[e] = struct{}{} + } } } groups := make([]*fodcv1.GroupLifecycleInfo, 0, len(groupMap)) - for _, g := range groupMap { + for name, g := range groupMap { + if set := mergedErrors[name]; len(set) > 0 { + errs := make([]string, 0, len(set)) + for e := range set { + errs = append(errs, e) + } + sort.Strings(errs) + g.Errors = errs + } groups = append(groups, g) } return groups diff --git a/fodc/proxy/internal/lifecycle/manager_test.go b/fodc/proxy/internal/lifecycle/manager_test.go index 55a76f166..3f1117705 100644 --- a/fodc/proxy/internal/lifecycle/manager_test.go +++ b/fodc/proxy/internal/lifecycle/manager_test.go @@ -394,3 +394,64 @@ func TestManager_CollectLifecycle_ContextCanceled(t *testing.T) { result, _ := mgr.CollectLifecycle(ctx) require.NotNil(t, result) } + +// TestManager_MergeGroups_UnionsErrorsAcrossAgents verifies that when +// multiple agents report the same group, the errors observed by each +// agent are unioned (deduped, sorted) rather than overwritten last-wins, +// so downstream consumers see every per-node failure regardless of which +// agent's view they happened to land on. +func TestManager_MergeGroups_UnionsErrorsAcrossAgents(t *testing.T) { + log := initTestLogger(t) + mgr := NewManager(nil, nil, log) + + groupName := "sw_metric" + agentA := &agentLifecycleData{ + PodName: "liaison-0", + Data: &fodcv1.LifecycleData{ + Groups: []*fodcv1.GroupLifecycleInfo{{ + Name: groupName, + Catalog: "CATALOG_MEASURE", + Errors: []string{ + "node error: cold-0 panic", + "future error: rpc deadline", + }, + }}, + }, + } + agentB := &agentLifecycleData{ + PodName: "liaison-1", + Data: &fodcv1.LifecycleData{ + Groups: []*fodcv1.GroupLifecycleInfo{{ + Name: groupName, + Catalog: "CATALOG_MEASURE", + Errors: []string{ + "node error: cold-0 panic", // duplicate of A's first + "broadcast failed: dial timeout", + }, + }}, + }, + } + agentC := &agentLifecycleData{ + PodName: "liaison-2", + Data: &fodcv1.LifecycleData{ + Groups: []*fodcv1.GroupLifecycleInfo{{ + Name: groupName, + Catalog: "CATALOG_MEASURE", + // no errors + }}, + }, + } + + merged := mgr.mergeGroups([]*agentLifecycleData{agentA, agentB, agentC}) + require.Len(t, merged, 1) + g := merged[0] + assert.Equal(t, groupName, g.Name) + assert.Equal(t, + []string{ + "broadcast failed: dial timeout", + "future error: rpc deadline", + "node error: cold-0 panic", + }, + g.Errors, + "errors must be the deduped, sorted union of every agent's view") +}
