This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5cfe59a124779b5cbb3fdfadcf4066809fa183c5 Author: mrproliu <[email protected]> AuthorDate: Thu Apr 30 10:11:53 2026 +0800 Fix FODC lifecycle endpoint wrong encoding and increase the gRPC timeout between agent and proxy (#1103) --- CHANGES.md | 2 + fodc/agent/internal/lifecycle/collector.go | 56 +++-- fodc/agent/internal/lifecycle/collector_test.go | 263 ++++++++++++++++++++++-- fodc/internal/timeouts/timeouts.go | 34 +++ fodc/proxy/internal/api/server.go | 66 +++++- fodc/proxy/internal/api/server_test.go | 196 ++++++++++++++++++ fodc/proxy/internal/lifecycle/manager.go | 9 +- 7 files changed, 584 insertions(+), 42 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d5b5ab50d..6356ba45e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,6 +29,8 @@ Release Notes. - Avoid FODC lifecycle inspection failing on busy data nodes by raising the per-broadcast `CollectDataInfo` / `CollectLiaisonInfo` deadline from 5s to 30s and parallelizing per-group inspection in the cluster-internal `InspectAll`. - Fix flaky `file_snapshot` subtest in measure/stream/trace by waiting until every introduced mem part has been flushed to disk, instead of only checking the latest snapshot creator. - Fix flaky `TestCollectWithPartialClosedSegments` by raising `SegmentIdleTimeout` so wall-clock variance on slow CI does not mark still-open segments as idle. +- Fix FODC lifecycle cache poisoning where transient `InspectAll` failures were cached for 10 minutes and masked liaison recovery; raise FODC agent and proxy timeouts from 10s to 40s. +- Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g. `replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to `protojson` so all fields are emitted (nil nested messages serialize as `null`). ### Chores diff --git a/fodc/agent/internal/lifecycle/collector.go b/fodc/agent/internal/lifecycle/collector.go index 6b9c559ff..1f94636dc 100644 --- a/fodc/agent/internal/lifecycle/collector.go +++ b/fodc/agent/internal/lifecycle/collector.go @@ -20,6 +20,7 @@ package lifecycle import ( "context" + "fmt" "os" "path/filepath" "sort" @@ -34,6 +35,7 @@ import ( "google.golang.org/grpc/status" fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" + "github.com/apache/skywalking-banyandb/fodc/internal/timeouts" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -44,8 +46,6 @@ const ( maxReportFileSize = 5 * 1024 * 1024 // 5MB ) -const grpcTimeout = 10 * time.Second - // Collector collects lifecycle data from local files. type Collector struct { lastCollectTime time.Time @@ -73,8 +73,10 @@ func NewCollector(log *logger.Logger, grpcAddr, reportDir string, cacheTTL time. } } -// Collect collects lifecycle data from local files. -// Data is cached for cacheTTL duration; after expiry the next call refreshes the cache. +// Collect returns lifecycle data, serving the cache when fresh. A transient InspectAll +// failure is propagated to the caller (the cache is left untouched so the next call +// retries) so that callers can distinguish a real failure from a healthy liaison that +// happens to have zero groups. func (c *Collector) Collect(ctx context.Context) (*fodcv1.LifecycleData, error) { c.mu.RLock() if c.currentData != nil && c.cacheTTL > 0 && c.nowFunc().Sub(c.lastCollectTime) < c.cacheTTL { @@ -84,9 +86,19 @@ func (c *Collector) Collect(ctx context.Context) (*fodcv1.LifecycleData, error) } c.mu.RUnlock() + reports := c.readReportFiles() + groups, err := c.collectGroups(ctx) + if err != nil { + if c.log != nil { + c.log.Warn().Err(err).Int("reports", len(reports)). + Msg("InspectAll failed; cache untouched, propagating error to caller") + } + return nil, err + } + data := &fodcv1.LifecycleData{ - Reports: c.readReportFiles(), - Groups: c.collectGroups(ctx), + Reports: reports, + Groups: groups, } c.mu.Lock() c.currentData = data @@ -95,23 +107,28 @@ func (c *Collector) Collect(ctx context.Context) (*fodcv1.LifecycleData, error) return data, nil } -func (c *Collector) collectGroups(ctx context.Context) []*fodcv1.GroupLifecycleInfo { +// collectGroups invokes InspectAll on the local liaison. +// +// Return contract: +// - (nil, nil): no RPC was issued (grpcAddr empty, or InspectAll already known to be Unimplemented). +// Callers treat this as a successful collection with no groups. +// - (non-nil slice, nil): RPC succeeded. The slice is empty (non-nil) when the server returned no groups. +// - (nil, err): a transient error occurred (DeadlineExceeded, Unavailable, dial failure, etc.). +// Callers must NOT cache this outcome. +func (c *Collector) collectGroups(ctx context.Context) ([]*fodcv1.GroupLifecycleInfo, error) { if c.grpcAddr == "" || c.grpcUnimplemented.Load() { - return nil + return nil, nil } conn, err := grpc.NewClient(c.grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - if c.log != nil { - c.log.Warn().Err(err).Str("addr", c.grpcAddr).Msg("Failed to create gRPC connection for InspectAll") - } - return nil + return nil, fmt.Errorf("dial %s: %w", c.grpcAddr, err) } defer func() { _ = conn.Close() }() client := fodcv1.NewGroupLifecycleServiceClient(conn) - reqCtx, cancel := context.WithTimeout(ctx, grpcTimeout) + reqCtx, cancel := context.WithTimeout(ctx, timeouts.AgentInspectAll) defer cancel() resp, err := client.InspectAll(reqCtx, &fodcv1.InspectAllRequest{}) if err != nil { @@ -120,14 +137,15 @@ func (c *Collector) collectGroups(ctx context.Context) []*fodcv1.GroupLifecycleI if c.log != nil { c.log.Info().Str("addr", c.grpcAddr).Msg("GroupLifecycleService.InspectAll is not implemented, skipping future calls") } - return nil + return nil, nil } - if c.log != nil { - c.log.Warn().Err(err).Str("addr", c.grpcAddr).Msg("InspectAll failed, skipping groups") - } - return nil + return nil, fmt.Errorf("InspectAll on %s: %w", c.grpcAddr, err) + } + got := resp.GetGroups() + if got == nil { + return []*fodcv1.GroupLifecycleInfo{}, nil } - return resp.GetGroups() + return got, nil } func (c *Collector) readReportFiles() []*fodcv1.LifecycleReport { diff --git a/fodc/agent/internal/lifecycle/collector_test.go b/fodc/agent/internal/lifecycle/collector_test.go index 0be13c09a..ab44d5053 100644 --- a/fodc/agent/internal/lifecycle/collector_test.go +++ b/fodc/agent/internal/lifecycle/collector_test.go @@ -18,14 +18,21 @@ package lifecycle import ( + "context" + "net" "os" "path/filepath" + "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -36,6 +43,73 @@ func initTestLogger(t *testing.T) *logger.Logger { return logger.GetLogger("test", "lifecycle") } +// fakeLifecycleService is a programmable test double for GroupLifecycleService.InspectAll. +// Each registered behavior is consumed in order; the call counter lets tests assert how +// many times Collect actually reached the gRPC server (i.e. whether the cache served the +// request without an RPC). +type fakeLifecycleService struct { + fodcv1.UnimplementedGroupLifecycleServiceServer + behaviors []func() (*fodcv1.InspectAllResponse, error) + mu sync.Mutex + callCount int +} + +func (f *fakeLifecycleService) InspectAll(_ context.Context, _ *fodcv1.InspectAllRequest) (*fodcv1.InspectAllResponse, error) { + f.mu.Lock() + defer f.mu.Unlock() + if f.callCount >= len(f.behaviors) { + f.callCount++ + return nil, status.Error(codes.Internal, "fakeLifecycleService: behavior list exhausted") + } + behavior := f.behaviors[f.callCount] + f.callCount++ + return behavior() +} + +func (f *fakeLifecycleService) calls() int { + f.mu.Lock() + defer f.mu.Unlock() + return f.callCount +} + +func returnGroups(groups []*fodcv1.GroupLifecycleInfo) func() (*fodcv1.InspectAllResponse, error) { + return func() (*fodcv1.InspectAllResponse, error) { + return &fodcv1.InspectAllResponse{Groups: groups}, nil + } +} + +func returnError(code codes.Code, msg string) func() (*fodcv1.InspectAllResponse, error) { + return func() (*fodcv1.InspectAllResponse, error) { + return nil, status.Error(code, msg) + } +} + +// startLocalServer spins up a real gRPC server on a random localhost port hosting the +// supplied fake and returns the listen address. The server is torn down via t.Cleanup. +func startLocalServer(t *testing.T, fake *fakeLifecycleService) string { + t.Helper() + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + srv := grpc.NewServer() + fodcv1.RegisterGroupLifecycleServiceServer(srv, fake) + go func() { + _ = srv.Serve(lis) + }() + t.Cleanup(srv.Stop) + return lis.Addr().String() +} + +// newCollectorForFake constructs a Collector wired to a fresh local gRPC server backed by fake. +func newCollectorForFake(t *testing.T, fake *fakeLifecycleService, cacheTTL time.Duration) *Collector { + t.Helper() + addr := startLocalServer(t, fake) + return NewCollector(initTestLogger(t), addr, t.TempDir(), cacheTTL) +} + +func sampleGroup(name string) *fodcv1.GroupLifecycleInfo { + return &fodcv1.GroupLifecycleInfo{Name: name} +} + func TestNewCollector(t *testing.T) { log := initTestLogger(t) collector := NewCollector(log, "", "", 10*time.Minute) @@ -51,11 +125,10 @@ func TestCollector_ReadReportFiles_EmptyDir(t *testing.T) { assert.Empty(t, reports) } -func TestCollector_Collect(t *testing.T) { +func TestCollector_Collect_NoGRPC(t *testing.T) { log := initTestLogger(t) collector := NewCollector(log, "", "", 0) - ctx := t.Context() - data, err := collector.Collect(ctx) + data, err := collector.Collect(t.Context()) require.NoError(t, err) require.NotNil(t, data) assert.NotNil(t, collector.currentData) @@ -88,20 +161,19 @@ func TestCollector_CacheTTL(t *testing.T) { assert.NotSame(t, data1, data3) } -func TestCollector_GrpcUnimplemented_SkipsFutureCalls(t *testing.T) { +func TestCollector_ZeroTTL_AlwaysRefreshes(t *testing.T) { log := initTestLogger(t) - collector := NewCollector(log, "localhost:0", "", 0) - - assert.False(t, collector.grpcUnimplemented.Load()) + collector := NewCollector(log, "", "", 0) + ctx := t.Context() - groups := collector.collectGroups(t.Context()) - assert.Nil(t, groups) - assert.False(t, collector.grpcUnimplemented.Load()) + data1, err := collector.Collect(ctx) + require.NoError(t, err) + require.NotNil(t, data1) - collector.grpcUnimplemented.Store(true) - groups = collector.collectGroups(t.Context()) - assert.Nil(t, groups) - assert.True(t, collector.grpcUnimplemented.Load()) + data2, err := collector.Collect(ctx) + require.NoError(t, err) + require.NotNil(t, data2) + assert.NotSame(t, data1, data2) } func TestCollector_ReadReportFiles_MaxFiles(t *testing.T) { @@ -109,7 +181,6 @@ func TestCollector_ReadReportFiles_MaxFiles(t *testing.T) { dir := t.TempDir() collector := NewCollector(log, "", dir, 0) - // Create 7 files with timestamp-like names so sort order is predictable for idx, name := range []string{ "2026-03-20.json", "2026-03-21.json", @@ -165,18 +236,172 @@ func TestCollector_ReadReportFiles_OversizedFile(t *testing.T) { assert.Equal(t, "2026-03-26.json", reports[0].Filename) } -func TestCollector_ZeroTTL_AlwaysRefreshes(t *testing.T) { +func TestCollectGroups_NoGRPCAddrShortCircuits(t *testing.T) { log := initTestLogger(t) collector := NewCollector(log, "", "", 0) + groups, err := collector.collectGroups(t.Context()) + require.NoError(t, err) + assert.Nil(t, groups) +} + +func TestCollectGroups_UnimplementedSetsFlagAndCachesNil(t *testing.T) { + fake := &fakeLifecycleService{ + behaviors: []func() (*fodcv1.InspectAllResponse, error){ + returnError(codes.Unimplemented, "not supported"), + }, + } + collector := newCollectorForFake(t, fake, time.Minute) + + groups, err := collector.collectGroups(t.Context()) + require.NoError(t, err) + assert.Nil(t, groups) + assert.True(t, collector.grpcUnimplemented.Load()) + + // Subsequent direct calls short-circuit without touching the server. + groups2, err := collector.collectGroups(t.Context()) + require.NoError(t, err) + assert.Nil(t, groups2) + assert.Equal(t, 1, fake.calls()) +} + +func TestCollectGroups_NilResponseGroupsBecomesEmptySlice(t *testing.T) { + fake := &fakeLifecycleService{ + behaviors: []func() (*fodcv1.InspectAllResponse, error){ + returnGroups(nil), + }, + } + collector := newCollectorForFake(t, fake, time.Minute) + + groups, err := collector.collectGroups(t.Context()) + require.NoError(t, err) + require.NotNil(t, groups, "successful InspectAll with empty groups must return non-nil empty slice, not nil") + assert.Empty(t, groups) +} + +func TestCollect_CachesSuccessfulResult(t *testing.T) { + fake := &fakeLifecycleService{ + behaviors: []func() (*fodcv1.InspectAllResponse, error){ + returnGroups([]*fodcv1.GroupLifecycleInfo{sampleGroup("g1"), sampleGroup("g2")}), + }, + } + collector := newCollectorForFake(t, fake, time.Minute) ctx := t.Context() data1, err := collector.Collect(ctx) require.NoError(t, err) - require.NotNil(t, data1) + require.Len(t, data1.Groups, 2) // With zero TTL, every call should re-collect data2, err := collector.Collect(ctx) require.NoError(t, err) - require.NotNil(t, data2) - assert.NotSame(t, data1, data2) + assert.Same(t, data1, data2, "second Collect within TTL must return the cached pointer") + assert.Equal(t, 1, fake.calls(), "InspectAll must not be invoked on cache hit") +} + +func TestCollect_DoesNotCacheTransientFailure(t *testing.T) { + fake := &fakeLifecycleService{ + behaviors: []func() (*fodcv1.InspectAllResponse, error){ + returnError(codes.DeadlineExceeded, "simulated timeout"), + returnGroups([]*fodcv1.GroupLifecycleInfo{sampleGroup("g1")}), + }, + } + collector := newCollectorForFake(t, fake, 10*time.Minute) + ctx := t.Context() + + data1, err := collector.Collect(ctx) + require.Error(t, err, "transient failure must propagate to caller") + assert.Nil(t, data1, "no LifecycleData on failure") + + collector.mu.RLock() + cachedData := collector.currentData + cachedTime := collector.lastCollectTime + collector.mu.RUnlock() + assert.Nil(t, cachedData, "transient failure must not write to cache") + assert.True(t, cachedTime.IsZero(), "lastCollectTime must remain zero after transient failure") + + data2, err := collector.Collect(ctx) + require.NoError(t, err) + require.Len(t, data2.Groups, 1, "next call must retry InspectAll, not return stale empty") + assert.Equal(t, "g1", data2.Groups[0].Name) + assert.Equal(t, 2, fake.calls(), "InspectAll must be invoked twice") +} + +func TestCollect_FailurePropagatesError(t *testing.T) { + successGroups := []*fodcv1.GroupLifecycleInfo{sampleGroup("g1"), sampleGroup("g2"), sampleGroup("g3")} + fake := &fakeLifecycleService{ + behaviors: []func() (*fodcv1.InspectAllResponse, error){ + returnGroups(successGroups), + returnError(codes.DeadlineExceeded, "simulated timeout"), + }, + } + collector := newCollectorForFake(t, fake, 0) // zero TTL forces both calls to hit the server + ctx := t.Context() + + data1, err := collector.Collect(ctx) + require.NoError(t, err) + require.Len(t, data1.Groups, 3) + + data2, err := collector.Collect(ctx) + require.Error(t, err, "transient failure after a successful collect must still surface as an error") + assert.Nil(t, data2, "caller must not receive a LifecycleData payload on failure") + assert.Equal(t, 2, fake.calls()) +} + +func TestCollect_TransientFailureDoesNotAdvanceLastCollectTime(t *testing.T) { + fake := &fakeLifecycleService{ + behaviors: []func() (*fodcv1.InspectAllResponse, error){ + returnGroups([]*fodcv1.GroupLifecycleInfo{sampleGroup("g1")}), + returnError(codes.Unavailable, "simulated outage"), + returnGroups([]*fodcv1.GroupLifecycleInfo{sampleGroup("g1"), sampleGroup("g2")}), + }, + } + collector := newCollectorForFake(t, fake, time.Hour) + ctx := t.Context() + + now := time.Unix(1_000_000_000, 0) + collector.nowFunc = func() time.Time { return now } + + _, err := collector.Collect(ctx) + require.NoError(t, err) + collector.mu.RLock() + successTime := collector.lastCollectTime + collector.mu.RUnlock() + assert.Equal(t, now, successTime) + + now = now.Add(2 * time.Hour) // expire the cache to force a new RPC + _, err = collector.Collect(ctx) + require.Error(t, err, "transient failure must surface as an error to the caller") + collector.mu.RLock() + failureTime := collector.lastCollectTime + collector.mu.RUnlock() + assert.Equal(t, successTime, failureTime, "transient failure must not advance lastCollectTime") + + data, err := collector.Collect(ctx) + require.NoError(t, err) + require.Len(t, data.Groups, 2, "after recovery, fresh groups must be cached") + assert.Equal(t, 3, fake.calls()) +} + +func TestCollect_UnimplementedCachesAndStops(t *testing.T) { + fake := &fakeLifecycleService{ + behaviors: []func() (*fodcv1.InspectAllResponse, error){ + returnError(codes.Unimplemented, "not supported"), + }, + } + collector := newCollectorForFake(t, fake, 10*time.Minute) + ctx := t.Context() + + data1, err := collector.Collect(ctx) + require.NoError(t, err) + assert.Nil(t, data1.Groups) + assert.True(t, collector.grpcUnimplemented.Load()) + + collector.mu.RLock() + require.NotNil(t, collector.currentData, "Unimplemented is a known terminal state and must be cached") + collector.mu.RUnlock() + + data2, err := collector.Collect(ctx) + require.NoError(t, err) + assert.Same(t, data1, data2, "subsequent calls within TTL must hit cache") + assert.Equal(t, 1, fake.calls(), "Unimplemented must short-circuit subsequent RPCs") } diff --git a/fodc/internal/timeouts/timeouts.go b/fodc/internal/timeouts/timeouts.go new file mode 100644 index 000000000..08d9e958c --- /dev/null +++ b/fodc/internal/timeouts/timeouts.go @@ -0,0 +1,34 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package timeouts holds timeout constants shared between the FODC agent and proxy. +// Co-locating these constants ensures the proxy-side per-agent collection deadline is +// always strictly greater than the agent-side InspectAll timeout, regardless of who +// imports them. +package timeouts + +import "time" + +// AgentInspectAll bounds how long a single InspectAll call against the local liaison +// may run on the FODC agent side. +const AgentInspectAll = 40 * time.Second + +// ProxySlack is the additional time the proxy waits, on top of AgentInspectAll, before +// declaring an agent unresponsive. It must be strictly greater than zero so the proxy +// always outlasts the agent's own deadline and never gives up while a slow but still +// progressing InspectAll call is in flight. +const ProxySlack = 10 * time.Second diff --git a/fodc/proxy/internal/api/server.go b/fodc/proxy/internal/api/server.go index 3ff5de920..f8c9338fa 100644 --- a/fodc/proxy/internal/api/server.go +++ b/fodc/proxy/internal/api/server.go @@ -28,6 +28,9 @@ import ( "strings" "time" + "google.golang.org/protobuf/encoding/protojson" + + fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" "github.com/apache/skywalking-banyandb/fodc/proxy/internal/cluster" "github.com/apache/skywalking-banyandb/fodc/proxy/internal/lifecycle" "github.com/apache/skywalking-banyandb/fodc/proxy/internal/metrics" @@ -35,6 +38,15 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" ) +// lifecycleGroupMarshaler emits zero-value protobuf fields (replicas=0, close=false, empty +// stages, etc.) as explicit JSON keys instead of dropping them. The default encoding/json + +// protoc-gen-go combination would silence those fields via `omitempty`, leaving downstream +// consumers (SRE agent) unable to tell "field absent" from "value is zero". +var lifecycleGroupMarshaler = protojson.MarshalOptions{ + UseProtoNames: true, + EmitUnpopulated: true, +} + // Server exposes REST and Prometheus-style endpoints for external consumption. type Server struct { metricsAggregator *metrics.Aggregator @@ -496,9 +508,23 @@ func (s *Server) handleClusterLifecycle(w http.ResponseWriter, r *http.Request) } lifecycleData, agentSummary := s.lifecycleManager.CollectLifecycle(r.Context()) + groupsJSON, err := marshalLifecycleGroups(lifecycleData.Groups) + if err != nil { + s.logger.Error().Err(err).Msg("Failed to marshal lifecycle groups") + http.Error(w, "Failed to serialize lifecycle groups", http.StatusInternalServerError) + return + } + + statusesJSON, err := marshalLifecycleStatuses(lifecycleData.LifecycleStatuses) + if err != nil { + s.logger.Error().Err(err).Msg("Failed to marshal lifecycle statuses") + http.Error(w, "Failed to serialize lifecycle statuses", http.StatusInternalServerError) + return + } + body := map[string]interface{}{ - "groups": lifecycleData.Groups, - "lifecycle_statuses": lifecycleData.LifecycleStatuses, + "groups": groupsJSON, + "lifecycle_statuses": statusesJSON, "agent_summary": agentSummary, } switch { @@ -519,6 +545,42 @@ func (s *Server) handleClusterLifecycle(w http.ResponseWriter, r *http.Request) } } +func marshalLifecycleGroups(groups []*fodcv1.GroupLifecycleInfo) ([]json.RawMessage, error) { + out := make([]json.RawMessage, 0, len(groups)) + for _, g := range groups { + raw, err := lifecycleGroupMarshaler.Marshal(g) + if err != nil { + return nil, fmt.Errorf("marshal group %q: %w", g.GetName(), err) + } + out = append(out, raw) + } + return out, nil +} + +// lifecycleStatusJSON mirrors lifecycle.PodLifecycleStatus but holds each report as a +// pre-marshaled protojson blob so the LifecycleReport's protobuf zero-value fields are +// emitted consistently with the groups payload. +type lifecycleStatusJSON struct { + PodName string `json:"pod_name"` + Reports []json.RawMessage `json:"reports"` +} + +func marshalLifecycleStatuses(statuses []*lifecycle.PodLifecycleStatus) ([]lifecycleStatusJSON, error) { + out := make([]lifecycleStatusJSON, 0, len(statuses)) + for _, s := range statuses { + reports := make([]json.RawMessage, 0, len(s.Reports)) + for _, r := range s.Reports { + raw, err := lifecycleGroupMarshaler.Marshal(r) + if err != nil { + return nil, fmt.Errorf("marshal report %q: %w", r.GetFilename(), err) + } + reports = append(reports, raw) + } + out = append(out, lifecycleStatusJSON{PodName: s.PodName, Reports: reports}) + } + return out, nil +} + // handleClusterTopology handles GET /cluster/topology endpoint. func (s *Server) handleClusterTopology(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { diff --git a/fodc/proxy/internal/api/server_test.go b/fodc/proxy/internal/api/server_test.go index b39d8c609..a4cf4ba1f 100644 --- a/fodc/proxy/internal/api/server_test.go +++ b/fodc/proxy/internal/api/server_test.go @@ -833,3 +833,199 @@ func TestHandleClusterLifecycle_NoAgents_FlagsErrorInBody(t *testing.T) { assert.EqualValues(t, 0, summary["total"], "agent_summary.total should be 0 when no agents are registered") assert.EqualValues(t, 0, summary["responded"], "agent_summary.responded should be 0 when no agents are registered") } + +// runLifecycleHandlerWithGroup wires up a lifecycle manager backed by a single agent that +// reports the supplied group, invokes the HTTP handler, and returns the parsed response +// body so callers can inspect raw JSON keys. +func runLifecycleHandlerWithGroup(t *testing.T, group *fodcv1.GroupLifecycleInfo) map[string]interface{} { + t.Helper() + initTestLogger(t) + testLogger := logger.GetLogger("test", "api") + testRegistry := registry.NewAgentRegistry(testLogger, 5*time.Second, 10*time.Second, 100) + t.Cleanup(testRegistry.Stop) + mockSender := &mockRequestSender{} + aggregator := metrics.NewAggregator(testRegistry, mockSender, testLogger) + + mockLifecycleRequester := &mockLifecycleDataRequester{ + dataByAgent: make(map[string]*fodcv1.LifecycleData), + podByAgent: make(map[string]string), + } + lifecycleMgr := lifecycle.NewManager(testRegistry, mockLifecycleRequester, testLogger) + mockLifecycleRequester.lifecycleMgr = lifecycleMgr + + identity := registry.AgentIdentity{ + PodName: "test-pod-1", + Role: "datanode", + ContainerNames: []string{"banyandb"}, + } + agentID, registerErr := testRegistry.RegisterAgent(context.Background(), identity) + require.NoError(t, registerErr) + + mockLifecycleRequester.dataByAgent[agentID] = &fodcv1.LifecycleData{ + Groups: []*fodcv1.GroupLifecycleInfo{group}, + } + mockLifecycleRequester.podByAgent[agentID] = "test-pod-1" + + server := NewServer(aggregator, nil, lifecycleMgr, testRegistry, testLogger) + req := httptest.NewRequest(http.MethodGet, "/cluster/lifecycle", nil) + resp := httptest.NewRecorder() + server.handleClusterLifecycle(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + var body map[string]interface{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) + return body +} + +// firstGroup pulls the first group from the decoded response body and asserts the basic shape. +func firstGroup(t *testing.T, body map[string]interface{}) map[string]interface{} { + t.Helper() + groups, ok := body["groups"].([]interface{}) + require.True(t, ok, "response body must contain a groups array") + require.NotEmpty(t, groups, "groups array must not be empty") + g, ok := groups[0].(map[string]interface{}) + require.True(t, ok, "first group must decode to an object") + return g +} + +func TestHandleClusterLifecycle_UsesSnakeCase(t *testing.T) { + group := &fodcv1.GroupLifecycleInfo{ + Name: "g1", + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 4, + }, + } + body := runLifecycleHandlerWithGroup(t, group) + g := firstGroup(t, body) + + resourceOpts, ok := g["resource_opts"].(map[string]interface{}) + require.True(t, ok, "field name must be snake_case 'resource_opts', not 'resourceOpts'") + assert.Equal(t, float64(4), resourceOpts["shard_num"], "field name must be snake_case 'shard_num'") + _, hasCamel := g["resourceOpts"] + assert.False(t, hasCamel, "camelCase key must not be emitted") +} + +func TestHandleClusterLifecycle_EmitsZeroValueFields(t *testing.T) { + group := &fodcv1.GroupLifecycleInfo{ + Name: "g1", + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 4, + Replicas: 0, // zero value must still appear in JSON output + Stages: []*commonv1.LifecycleStage{ + { + Name: "warm", + ShardNum: 2, + Close: false, // zero value must still appear + Replicas: 0, // zero value must still appear + }, + }, + }, + } + body := runLifecycleHandlerWithGroup(t, group) + g := firstGroup(t, body) + + resourceOpts := g["resource_opts"].(map[string]interface{}) + _, hasReplicas := resourceOpts["replicas"] + assert.True(t, hasReplicas, "zero-value replicas at ResourceOpts level must be emitted, not omitted") + assert.Equal(t, float64(0), resourceOpts["replicas"]) + + stages, ok := resourceOpts["stages"].([]interface{}) + require.True(t, ok) + require.Len(t, stages, 1) + stage := stages[0].(map[string]interface{}) + + _, hasClose := stage["close"] + assert.True(t, hasClose, "zero-value close=false in LifecycleStage must be emitted") + assert.Equal(t, false, stage["close"]) + + _, hasStageReplicas := stage["replicas"] + assert.True(t, hasStageReplicas, "zero-value replicas in LifecycleStage must be emitted") + assert.Equal(t, float64(0), stage["replicas"]) +} + +func TestHandleClusterLifecycle_NilResourceOptsIsNull(t *testing.T) { + group := &fodcv1.GroupLifecycleInfo{ + Name: "g1", + // ResourceOpts intentionally nil + } + body := runLifecycleHandlerWithGroup(t, group) + g := firstGroup(t, body) + + require.Contains(t, g, "resource_opts", "EmitUnpopulated must surface the absent message as a key") + assert.Nil(t, g["resource_opts"], "nil protobuf message marshals to JSON null under EmitUnpopulated") +} + +func TestHandleClusterLifecycle_DataInfoEmitted(t *testing.T) { + group := &fodcv1.GroupLifecycleInfo{ + Name: "g1", + DataInfo: []*databasev1.DataInfo{ + {DataSizeBytes: 1024}, + }, + } + body := runLifecycleHandlerWithGroup(t, group) + g := firstGroup(t, body) + + dataInfo, ok := g["data_info"].([]interface{}) + require.True(t, ok, "data_info must be encoded as snake_case array") + require.Len(t, dataInfo, 1) +} + +func TestHandleClusterLifecycle_StatusReportsUseProtoJSON(t *testing.T) { + initTestLogger(t) + testLogger := logger.GetLogger("test", "api") + testRegistry := registry.NewAgentRegistry(testLogger, 5*time.Second, 10*time.Second, 100) + t.Cleanup(testRegistry.Stop) + mockSender := &mockRequestSender{} + aggregator := metrics.NewAggregator(testRegistry, mockSender, testLogger) + + mockLifecycleRequester := &mockLifecycleDataRequester{ + dataByAgent: make(map[string]*fodcv1.LifecycleData), + podByAgent: make(map[string]string), + } + lifecycleMgr := lifecycle.NewManager(testRegistry, mockLifecycleRequester, testLogger) + mockLifecycleRequester.lifecycleMgr = lifecycleMgr + + identity := registry.AgentIdentity{ + PodName: "test-pod-1", + Role: "datanode", + ContainerNames: []string{"banyandb"}, + } + agentID, registerErr := testRegistry.RegisterAgent(context.Background(), identity) + require.NoError(t, registerErr) + + mockLifecycleRequester.dataByAgent[agentID] = &fodcv1.LifecycleData{ + Reports: []*fodcv1.LifecycleReport{ + {Filename: "2026-04-29.json", ReportJson: `{"status":"ok"}`}, + {Filename: "2026-04-28.json", ReportJson: ""}, // empty value must still be emitted + }, + } + mockLifecycleRequester.podByAgent[agentID] = "test-pod-1" + + server := NewServer(aggregator, nil, lifecycleMgr, testRegistry, testLogger) + req := httptest.NewRequest(http.MethodGet, "/cluster/lifecycle", nil) + resp := httptest.NewRecorder() + server.handleClusterLifecycle(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + var body map[string]interface{} + require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) + + statuses, ok := body["lifecycle_statuses"].([]interface{}) + require.True(t, ok) + require.Len(t, statuses, 1) + status := statuses[0].(map[string]interface{}) + assert.Equal(t, "test-pod-1", status["pod_name"]) + + reports, ok := status["reports"].([]interface{}) + require.True(t, ok) + require.Len(t, reports, 2) + + first := reports[0].(map[string]interface{}) + assert.Equal(t, "2026-04-29.json", first["filename"]) + assert.Equal(t, `{"status":"ok"}`, first["report_json"], "field must be snake_case 'report_json'") + + second := reports[1].(map[string]interface{}) + _, hasReportJSON := second["report_json"] + assert.True(t, hasReportJSON, "empty report_json must still be emitted under EmitUnpopulated") + assert.Equal(t, "", second["report_json"]) +} diff --git a/fodc/proxy/internal/lifecycle/manager.go b/fodc/proxy/internal/lifecycle/manager.go index 207354730..23c7bc9db 100644 --- a/fodc/proxy/internal/lifecycle/manager.go +++ b/fodc/proxy/internal/lifecycle/manager.go @@ -21,14 +21,19 @@ package lifecycle import ( "context" "sync" - "time" fodcv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1" + "github.com/apache/skywalking-banyandb/fodc/internal/timeouts" "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry" "github.com/apache/skywalking-banyandb/pkg/logger" ) -const defaultCollectionTimeout = 10 * time.Second +// defaultCollectionTimeout bounds how long the proxy waits for each agent to push back +// its lifecycle data. It is derived from the agent-side InspectAll timeout plus a fixed +// slack so that this deadline is strictly greater than the agent's own deadline; the +// proxy must always outlast the agent and never give up while a still-progressing +// InspectAll call is in flight on the agent side. +const defaultCollectionTimeout = timeouts.AgentInspectAll + timeouts.ProxySlack // PodLifecycleStatus represents lifecycle status for a single pod. type PodLifecycleStatus struct {
