This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5cfe59a124779b5cbb3fdfadcf4066809fa183c5
Author: mrproliu <[email protected]>
AuthorDate: Thu Apr 30 10:11:53 2026 +0800

    Fix FODC lifecycle endpoint wrong encoding and increase the gRPC timeout 
between agent and proxy (#1103)
---
 CHANGES.md                                      |   2 +
 fodc/agent/internal/lifecycle/collector.go      |  56 +++--
 fodc/agent/internal/lifecycle/collector_test.go | 263 ++++++++++++++++++++++--
 fodc/internal/timeouts/timeouts.go              |  34 +++
 fodc/proxy/internal/api/server.go               |  66 +++++-
 fodc/proxy/internal/api/server_test.go          | 196 ++++++++++++++++++
 fodc/proxy/internal/lifecycle/manager.go        |   9 +-
 7 files changed, 584 insertions(+), 42 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d5b5ab50d..6356ba45e 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -29,6 +29,8 @@ Release Notes.
 - Avoid FODC lifecycle inspection failing on busy data nodes by raising the 
per-broadcast `CollectDataInfo` / `CollectLiaisonInfo` deadline from 5s to 30s 
and parallelizing per-group inspection in the cluster-internal `InspectAll`.
 - Fix flaky `file_snapshot` subtest in measure/stream/trace by waiting until 
every introduced mem part has been flushed to disk, instead of only checking 
the latest snapshot creator.
 - Fix flaky `TestCollectWithPartialClosedSegments` by raising 
`SegmentIdleTimeout` so wall-clock variance on slow CI does not mark still-open 
segments as idle.
+- Fix FODC lifecycle cache poisoning where transient `InspectAll` failures 
were cached for 10 minutes and masked liaison recovery; raise FODC agent and 
proxy timeouts from 10s to 40s.
+- Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g. 
`replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to 
`protojson` so all fields are emitted (nil nested messages serialize as `null`).
 
 ### Chores
 
diff --git a/fodc/agent/internal/lifecycle/collector.go 
b/fodc/agent/internal/lifecycle/collector.go
index 6b9c559ff..1f94636dc 100644
--- a/fodc/agent/internal/lifecycle/collector.go
+++ b/fodc/agent/internal/lifecycle/collector.go
@@ -20,6 +20,7 @@ package lifecycle
 
 import (
        "context"
+       "fmt"
        "os"
        "path/filepath"
        "sort"
@@ -34,6 +35,7 @@ import (
        "google.golang.org/grpc/status"
 
        fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/internal/timeouts"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -44,8 +46,6 @@ const (
        maxReportFileSize = 5 * 1024 * 1024 // 5MB
 )
 
-const grpcTimeout = 10 * time.Second
-
 // Collector collects lifecycle data from local files.
 type Collector struct {
        lastCollectTime   time.Time
@@ -73,8 +73,10 @@ func NewCollector(log *logger.Logger, grpcAddr, reportDir 
string, cacheTTL time.
        }
 }
 
-// Collect collects lifecycle data from local files.
-// Data is cached for cacheTTL duration; after expiry the next call refreshes 
the cache.
+// Collect returns lifecycle data, serving the cache when fresh. A transient 
InspectAll
+// failure is propagated to the caller (the cache is left untouched so the 
next call
+// retries) so that callers can distinguish a real failure from a healthy 
liaison that
+// happens to have zero groups.
 func (c *Collector) Collect(ctx context.Context) (*fodcv1.LifecycleData, 
error) {
        c.mu.RLock()
        if c.currentData != nil && c.cacheTTL > 0 && 
c.nowFunc().Sub(c.lastCollectTime) < c.cacheTTL {
@@ -84,9 +86,19 @@ func (c *Collector) Collect(ctx context.Context) 
(*fodcv1.LifecycleData, error)
        }
        c.mu.RUnlock()
 
+       reports := c.readReportFiles()
+       groups, err := c.collectGroups(ctx)
+       if err != nil {
+               if c.log != nil {
+                       c.log.Warn().Err(err).Int("reports", len(reports)).
+                               Msg("InspectAll failed; cache untouched, 
propagating error to caller")
+               }
+               return nil, err
+       }
+
        data := &fodcv1.LifecycleData{
-               Reports: c.readReportFiles(),
-               Groups:  c.collectGroups(ctx),
+               Reports: reports,
+               Groups:  groups,
        }
        c.mu.Lock()
        c.currentData = data
@@ -95,23 +107,28 @@ func (c *Collector) Collect(ctx context.Context) 
(*fodcv1.LifecycleData, error)
        return data, nil
 }
 
-func (c *Collector) collectGroups(ctx context.Context) 
[]*fodcv1.GroupLifecycleInfo {
+// collectGroups invokes InspectAll on the local liaison.
+//
+// Return contract:
+//   - (nil, nil): no RPC was issued (grpcAddr empty, or InspectAll already 
known to be Unimplemented).
+//     Callers treat this as a successful collection with no groups.
+//   - (non-nil slice, nil): RPC succeeded. The slice is empty (non-nil) when 
the server returned no groups.
+//   - (nil, err): a transient error occurred (DeadlineExceeded, Unavailable, 
dial failure, etc.).
+//     Callers must NOT cache this outcome.
+func (c *Collector) collectGroups(ctx context.Context) 
([]*fodcv1.GroupLifecycleInfo, error) {
        if c.grpcAddr == "" || c.grpcUnimplemented.Load() {
-               return nil
+               return nil, nil
        }
        conn, err := grpc.NewClient(c.grpcAddr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
        if err != nil {
-               if c.log != nil {
-                       c.log.Warn().Err(err).Str("addr", 
c.grpcAddr).Msg("Failed to create gRPC connection for InspectAll")
-               }
-               return nil
+               return nil, fmt.Errorf("dial %s: %w", c.grpcAddr, err)
        }
        defer func() {
                _ = conn.Close()
        }()
 
        client := fodcv1.NewGroupLifecycleServiceClient(conn)
-       reqCtx, cancel := context.WithTimeout(ctx, grpcTimeout)
+       reqCtx, cancel := context.WithTimeout(ctx, timeouts.AgentInspectAll)
        defer cancel()
        resp, err := client.InspectAll(reqCtx, &fodcv1.InspectAllRequest{})
        if err != nil {
@@ -120,14 +137,15 @@ func (c *Collector) collectGroups(ctx context.Context) 
[]*fodcv1.GroupLifecycleI
                        if c.log != nil {
                                c.log.Info().Str("addr", 
c.grpcAddr).Msg("GroupLifecycleService.InspectAll is not implemented, skipping 
future calls")
                        }
-                       return nil
+                       return nil, nil
                }
-               if c.log != nil {
-                       c.log.Warn().Err(err).Str("addr", 
c.grpcAddr).Msg("InspectAll failed, skipping groups")
-               }
-               return nil
+               return nil, fmt.Errorf("InspectAll on %s: %w", c.grpcAddr, err)
+       }
+       got := resp.GetGroups()
+       if got == nil {
+               return []*fodcv1.GroupLifecycleInfo{}, nil
        }
-       return resp.GetGroups()
+       return got, nil
 }
 
 func (c *Collector) readReportFiles() []*fodcv1.LifecycleReport {
diff --git a/fodc/agent/internal/lifecycle/collector_test.go 
b/fodc/agent/internal/lifecycle/collector_test.go
index 0be13c09a..ab44d5053 100644
--- a/fodc/agent/internal/lifecycle/collector_test.go
+++ b/fodc/agent/internal/lifecycle/collector_test.go
@@ -18,14 +18,21 @@
 package lifecycle
 
 import (
+       "context"
+       "net"
        "os"
        "path/filepath"
+       "sync"
        "testing"
        "time"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
 
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -36,6 +43,73 @@ func initTestLogger(t *testing.T) *logger.Logger {
        return logger.GetLogger("test", "lifecycle")
 }
 
+// fakeLifecycleService is a programmable test double for 
GroupLifecycleService.InspectAll.
+// Each registered behavior is consumed in order; the call counter lets tests 
assert how
+// many times Collect actually reached the gRPC server (i.e. whether the cache 
served the
+// request without an RPC).
+type fakeLifecycleService struct {
+       fodcv1.UnimplementedGroupLifecycleServiceServer
+       behaviors []func() (*fodcv1.InspectAllResponse, error)
+       mu        sync.Mutex
+       callCount int
+}
+
+func (f *fakeLifecycleService) InspectAll(_ context.Context, _ 
*fodcv1.InspectAllRequest) (*fodcv1.InspectAllResponse, error) {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       if f.callCount >= len(f.behaviors) {
+               f.callCount++
+               return nil, status.Error(codes.Internal, "fakeLifecycleService: 
behavior list exhausted")
+       }
+       behavior := f.behaviors[f.callCount]
+       f.callCount++
+       return behavior()
+}
+
+func (f *fakeLifecycleService) calls() int {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       return f.callCount
+}
+
+func returnGroups(groups []*fodcv1.GroupLifecycleInfo) func() 
(*fodcv1.InspectAllResponse, error) {
+       return func() (*fodcv1.InspectAllResponse, error) {
+               return &fodcv1.InspectAllResponse{Groups: groups}, nil
+       }
+}
+
+func returnError(code codes.Code, msg string) func() 
(*fodcv1.InspectAllResponse, error) {
+       return func() (*fodcv1.InspectAllResponse, error) {
+               return nil, status.Error(code, msg)
+       }
+}
+
+// startLocalServer spins up a real gRPC server on a random localhost port 
hosting the
+// supplied fake and returns the listen address. The server is torn down via 
t.Cleanup.
+func startLocalServer(t *testing.T, fake *fakeLifecycleService) string {
+       t.Helper()
+       lis, err := net.Listen("tcp", "127.0.0.1:0")
+       require.NoError(t, err)
+       srv := grpc.NewServer()
+       fodcv1.RegisterGroupLifecycleServiceServer(srv, fake)
+       go func() {
+               _ = srv.Serve(lis)
+       }()
+       t.Cleanup(srv.Stop)
+       return lis.Addr().String()
+}
+
+// newCollectorForFake constructs a Collector wired to a fresh local gRPC 
server backed by fake.
+func newCollectorForFake(t *testing.T, fake *fakeLifecycleService, cacheTTL 
time.Duration) *Collector {
+       t.Helper()
+       addr := startLocalServer(t, fake)
+       return NewCollector(initTestLogger(t), addr, t.TempDir(), cacheTTL)
+}
+
+func sampleGroup(name string) *fodcv1.GroupLifecycleInfo {
+       return &fodcv1.GroupLifecycleInfo{Name: name}
+}
+
 func TestNewCollector(t *testing.T) {
        log := initTestLogger(t)
        collector := NewCollector(log, "", "", 10*time.Minute)
@@ -51,11 +125,10 @@ func TestCollector_ReadReportFiles_EmptyDir(t *testing.T) {
        assert.Empty(t, reports)
 }
 
-func TestCollector_Collect(t *testing.T) {
+func TestCollector_Collect_NoGRPC(t *testing.T) {
        log := initTestLogger(t)
        collector := NewCollector(log, "", "", 0)
-       ctx := t.Context()
-       data, err := collector.Collect(ctx)
+       data, err := collector.Collect(t.Context())
        require.NoError(t, err)
        require.NotNil(t, data)
        assert.NotNil(t, collector.currentData)
@@ -88,20 +161,19 @@ func TestCollector_CacheTTL(t *testing.T) {
        assert.NotSame(t, data1, data3)
 }
 
-func TestCollector_GrpcUnimplemented_SkipsFutureCalls(t *testing.T) {
+func TestCollector_ZeroTTL_AlwaysRefreshes(t *testing.T) {
        log := initTestLogger(t)
-       collector := NewCollector(log, "localhost:0", "", 0)
-
-       assert.False(t, collector.grpcUnimplemented.Load())
+       collector := NewCollector(log, "", "", 0)
+       ctx := t.Context()
 
-       groups := collector.collectGroups(t.Context())
-       assert.Nil(t, groups)
-       assert.False(t, collector.grpcUnimplemented.Load())
+       data1, err := collector.Collect(ctx)
+       require.NoError(t, err)
+       require.NotNil(t, data1)
 
-       collector.grpcUnimplemented.Store(true)
-       groups = collector.collectGroups(t.Context())
-       assert.Nil(t, groups)
-       assert.True(t, collector.grpcUnimplemented.Load())
+       data2, err := collector.Collect(ctx)
+       require.NoError(t, err)
+       require.NotNil(t, data2)
+       assert.NotSame(t, data1, data2)
 }
 
 func TestCollector_ReadReportFiles_MaxFiles(t *testing.T) {
@@ -109,7 +181,6 @@ func TestCollector_ReadReportFiles_MaxFiles(t *testing.T) {
        dir := t.TempDir()
        collector := NewCollector(log, "", dir, 0)
 
-       // Create 7 files with timestamp-like names so sort order is predictable
        for idx, name := range []string{
                "2026-03-20.json",
                "2026-03-21.json",
@@ -165,18 +236,172 @@ func TestCollector_ReadReportFiles_OversizedFile(t 
*testing.T) {
        assert.Equal(t, "2026-03-26.json", reports[0].Filename)
 }
 
-func TestCollector_ZeroTTL_AlwaysRefreshes(t *testing.T) {
+func TestCollectGroups_NoGRPCAddrShortCircuits(t *testing.T) {
        log := initTestLogger(t)
        collector := NewCollector(log, "", "", 0)
+       groups, err := collector.collectGroups(t.Context())
+       require.NoError(t, err)
+       assert.Nil(t, groups)
+}
+
+func TestCollectGroups_UnimplementedSetsFlagAndCachesNil(t *testing.T) {
+       fake := &fakeLifecycleService{
+               behaviors: []func() (*fodcv1.InspectAllResponse, error){
+                       returnError(codes.Unimplemented, "not supported"),
+               },
+       }
+       collector := newCollectorForFake(t, fake, time.Minute)
+
+       groups, err := collector.collectGroups(t.Context())
+       require.NoError(t, err)
+       assert.Nil(t, groups)
+       assert.True(t, collector.grpcUnimplemented.Load())
+
+       // Subsequent direct calls short-circuit without touching the server.
+       groups2, err := collector.collectGroups(t.Context())
+       require.NoError(t, err)
+       assert.Nil(t, groups2)
+       assert.Equal(t, 1, fake.calls())
+}
+
+func TestCollectGroups_NilResponseGroupsBecomesEmptySlice(t *testing.T) {
+       fake := &fakeLifecycleService{
+               behaviors: []func() (*fodcv1.InspectAllResponse, error){
+                       returnGroups(nil),
+               },
+       }
+       collector := newCollectorForFake(t, fake, time.Minute)
+
+       groups, err := collector.collectGroups(t.Context())
+       require.NoError(t, err)
+       require.NotNil(t, groups, "successful InspectAll with empty groups must 
return non-nil empty slice, not nil")
+       assert.Empty(t, groups)
+}
+
+func TestCollect_CachesSuccessfulResult(t *testing.T) {
+       fake := &fakeLifecycleService{
+               behaviors: []func() (*fodcv1.InspectAllResponse, error){
+                       
returnGroups([]*fodcv1.GroupLifecycleInfo{sampleGroup("g1"), 
sampleGroup("g2")}),
+               },
+       }
+       collector := newCollectorForFake(t, fake, time.Minute)
        ctx := t.Context()
 
        data1, err := collector.Collect(ctx)
        require.NoError(t, err)
-       require.NotNil(t, data1)
+       require.Len(t, data1.Groups, 2)
 
        // With zero TTL, every call should re-collect
        data2, err := collector.Collect(ctx)
        require.NoError(t, err)
-       require.NotNil(t, data2)
-       assert.NotSame(t, data1, data2)
+       assert.Same(t, data1, data2, "second Collect within TTL must return the 
cached pointer")
+       assert.Equal(t, 1, fake.calls(), "InspectAll must not be invoked on 
cache hit")
+}
+
+func TestCollect_DoesNotCacheTransientFailure(t *testing.T) {
+       fake := &fakeLifecycleService{
+               behaviors: []func() (*fodcv1.InspectAllResponse, error){
+                       returnError(codes.DeadlineExceeded, "simulated 
timeout"),
+                       
returnGroups([]*fodcv1.GroupLifecycleInfo{sampleGroup("g1")}),
+               },
+       }
+       collector := newCollectorForFake(t, fake, 10*time.Minute)
+       ctx := t.Context()
+
+       data1, err := collector.Collect(ctx)
+       require.Error(t, err, "transient failure must propagate to caller")
+       assert.Nil(t, data1, "no LifecycleData on failure")
+
+       collector.mu.RLock()
+       cachedData := collector.currentData
+       cachedTime := collector.lastCollectTime
+       collector.mu.RUnlock()
+       assert.Nil(t, cachedData, "transient failure must not write to cache")
+       assert.True(t, cachedTime.IsZero(), "lastCollectTime must remain zero 
after transient failure")
+
+       data2, err := collector.Collect(ctx)
+       require.NoError(t, err)
+       require.Len(t, data2.Groups, 1, "next call must retry InspectAll, not 
return stale empty")
+       assert.Equal(t, "g1", data2.Groups[0].Name)
+       assert.Equal(t, 2, fake.calls(), "InspectAll must be invoked twice")
+}
+
+func TestCollect_FailurePropagatesError(t *testing.T) {
+       successGroups := []*fodcv1.GroupLifecycleInfo{sampleGroup("g1"), 
sampleGroup("g2"), sampleGroup("g3")}
+       fake := &fakeLifecycleService{
+               behaviors: []func() (*fodcv1.InspectAllResponse, error){
+                       returnGroups(successGroups),
+                       returnError(codes.DeadlineExceeded, "simulated 
timeout"),
+               },
+       }
+       collector := newCollectorForFake(t, fake, 0) // zero TTL forces both 
calls to hit the server
+       ctx := t.Context()
+
+       data1, err := collector.Collect(ctx)
+       require.NoError(t, err)
+       require.Len(t, data1.Groups, 3)
+
+       data2, err := collector.Collect(ctx)
+       require.Error(t, err, "transient failure after a successful collect 
must still surface as an error")
+       assert.Nil(t, data2, "caller must not receive a LifecycleData payload 
on failure")
+       assert.Equal(t, 2, fake.calls())
+}
+
+func TestCollect_TransientFailureDoesNotAdvanceLastCollectTime(t *testing.T) {
+       fake := &fakeLifecycleService{
+               behaviors: []func() (*fodcv1.InspectAllResponse, error){
+                       
returnGroups([]*fodcv1.GroupLifecycleInfo{sampleGroup("g1")}),
+                       returnError(codes.Unavailable, "simulated outage"),
+                       
returnGroups([]*fodcv1.GroupLifecycleInfo{sampleGroup("g1"), 
sampleGroup("g2")}),
+               },
+       }
+       collector := newCollectorForFake(t, fake, time.Hour)
+       ctx := t.Context()
+
+       now := time.Unix(1_000_000_000, 0)
+       collector.nowFunc = func() time.Time { return now }
+
+       _, err := collector.Collect(ctx)
+       require.NoError(t, err)
+       collector.mu.RLock()
+       successTime := collector.lastCollectTime
+       collector.mu.RUnlock()
+       assert.Equal(t, now, successTime)
+
+       now = now.Add(2 * time.Hour) // expire the cache to force a new RPC
+       _, err = collector.Collect(ctx)
+       require.Error(t, err, "transient failure must surface as an error to 
the caller")
+       collector.mu.RLock()
+       failureTime := collector.lastCollectTime
+       collector.mu.RUnlock()
+       assert.Equal(t, successTime, failureTime, "transient failure must not 
advance lastCollectTime")
+
+       data, err := collector.Collect(ctx)
+       require.NoError(t, err)
+       require.Len(t, data.Groups, 2, "after recovery, fresh groups must be 
cached")
+       assert.Equal(t, 3, fake.calls())
+}
+
+func TestCollect_UnimplementedCachesAndStops(t *testing.T) {
+       fake := &fakeLifecycleService{
+               behaviors: []func() (*fodcv1.InspectAllResponse, error){
+                       returnError(codes.Unimplemented, "not supported"),
+               },
+       }
+       collector := newCollectorForFake(t, fake, 10*time.Minute)
+       ctx := t.Context()
+
+       data1, err := collector.Collect(ctx)
+       require.NoError(t, err)
+       assert.Nil(t, data1.Groups)
+       assert.True(t, collector.grpcUnimplemented.Load())
+
+       collector.mu.RLock()
+       require.NotNil(t, collector.currentData, "Unimplemented is a known 
terminal state and must be cached")
+       collector.mu.RUnlock()
+
+       data2, err := collector.Collect(ctx)
+       require.NoError(t, err)
+       assert.Same(t, data1, data2, "subsequent calls within TTL must hit 
cache")
+       assert.Equal(t, 1, fake.calls(), "Unimplemented must short-circuit 
subsequent RPCs")
 }
diff --git a/fodc/internal/timeouts/timeouts.go 
b/fodc/internal/timeouts/timeouts.go
new file mode 100644
index 000000000..08d9e958c
--- /dev/null
+++ b/fodc/internal/timeouts/timeouts.go
@@ -0,0 +1,34 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package timeouts holds timeout constants shared between the FODC agent and 
proxy.
+// Co-locating these constants ensures the proxy-side per-agent collection 
deadline is
+// always strictly greater than the agent-side InspectAll timeout, regardless 
of who
+// imports them.
+package timeouts
+
+import "time"
+
+// AgentInspectAll bounds how long a single InspectAll call against the local 
liaison
+// may run on the FODC agent side.
+const AgentInspectAll = 40 * time.Second
+
+// ProxySlack is the additional time the proxy waits, on top of 
AgentInspectAll, before
+// declaring an agent unresponsive. It must be strictly greater than zero so 
the proxy
+// always outlasts the agent's own deadline and never gives up while a slow 
but still
+// progressing InspectAll call is in flight.
+const ProxySlack = 10 * time.Second
diff --git a/fodc/proxy/internal/api/server.go 
b/fodc/proxy/internal/api/server.go
index 3ff5de920..f8c9338fa 100644
--- a/fodc/proxy/internal/api/server.go
+++ b/fodc/proxy/internal/api/server.go
@@ -28,6 +28,9 @@ import (
        "strings"
        "time"
 
+       "google.golang.org/protobuf/encoding/protojson"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
        "github.com/apache/skywalking-banyandb/fodc/proxy/internal/cluster"
        "github.com/apache/skywalking-banyandb/fodc/proxy/internal/lifecycle"
        "github.com/apache/skywalking-banyandb/fodc/proxy/internal/metrics"
@@ -35,6 +38,15 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+// lifecycleGroupMarshaler emits zero-value protobuf fields (replicas=0, 
close=false, empty
+// stages, etc.) as explicit JSON keys instead of dropping them. The default 
encoding/json +
+// protoc-gen-go combination would silence those fields via `omitempty`, 
leaving downstream
+// consumers (SRE agent) unable to tell "field absent" from "value is zero".
+var lifecycleGroupMarshaler = protojson.MarshalOptions{
+       UseProtoNames:   true,
+       EmitUnpopulated: true,
+}
+
 // Server exposes REST and Prometheus-style endpoints for external consumption.
 type Server struct {
        metricsAggregator     *metrics.Aggregator
@@ -496,9 +508,23 @@ func (s *Server) handleClusterLifecycle(w 
http.ResponseWriter, r *http.Request)
        }
        lifecycleData, agentSummary := 
s.lifecycleManager.CollectLifecycle(r.Context())
 
+       groupsJSON, err := marshalLifecycleGroups(lifecycleData.Groups)
+       if err != nil {
+               s.logger.Error().Err(err).Msg("Failed to marshal lifecycle 
groups")
+               http.Error(w, "Failed to serialize lifecycle groups", 
http.StatusInternalServerError)
+               return
+       }
+
+       statusesJSON, err := 
marshalLifecycleStatuses(lifecycleData.LifecycleStatuses)
+       if err != nil {
+               s.logger.Error().Err(err).Msg("Failed to marshal lifecycle 
statuses")
+               http.Error(w, "Failed to serialize lifecycle statuses", 
http.StatusInternalServerError)
+               return
+       }
+
        body := map[string]interface{}{
-               "groups":             lifecycleData.Groups,
-               "lifecycle_statuses": lifecycleData.LifecycleStatuses,
+               "groups":             groupsJSON,
+               "lifecycle_statuses": statusesJSON,
                "agent_summary":      agentSummary,
        }
        switch {
@@ -519,6 +545,42 @@ func (s *Server) handleClusterLifecycle(w 
http.ResponseWriter, r *http.Request)
        }
 }
 
+func marshalLifecycleGroups(groups []*fodcv1.GroupLifecycleInfo) 
([]json.RawMessage, error) {
+       out := make([]json.RawMessage, 0, len(groups))
+       for _, g := range groups {
+               raw, err := lifecycleGroupMarshaler.Marshal(g)
+               if err != nil {
+                       return nil, fmt.Errorf("marshal group %q: %w", 
g.GetName(), err)
+               }
+               out = append(out, raw)
+       }
+       return out, nil
+}
+
+// lifecycleStatusJSON mirrors lifecycle.PodLifecycleStatus but holds each 
report as a
+// pre-marshaled protojson blob so the LifecycleReport's protobuf zero-value 
fields are
+// emitted consistently with the groups payload.
+type lifecycleStatusJSON struct {
+       PodName string            `json:"pod_name"`
+       Reports []json.RawMessage `json:"reports"`
+}
+
+func marshalLifecycleStatuses(statuses []*lifecycle.PodLifecycleStatus) 
([]lifecycleStatusJSON, error) {
+       out := make([]lifecycleStatusJSON, 0, len(statuses))
+       for _, s := range statuses {
+               reports := make([]json.RawMessage, 0, len(s.Reports))
+               for _, r := range s.Reports {
+                       raw, err := lifecycleGroupMarshaler.Marshal(r)
+                       if err != nil {
+                               return nil, fmt.Errorf("marshal report %q: %w", 
r.GetFilename(), err)
+                       }
+                       reports = append(reports, raw)
+               }
+               out = append(out, lifecycleStatusJSON{PodName: s.PodName, 
Reports: reports})
+       }
+       return out, nil
+}
+
 // handleClusterTopology handles GET /cluster/topology endpoint.
 func (s *Server) handleClusterTopology(w http.ResponseWriter, r *http.Request) 
{
        if r.Method != http.MethodGet {
diff --git a/fodc/proxy/internal/api/server_test.go 
b/fodc/proxy/internal/api/server_test.go
index b39d8c609..a4cf4ba1f 100644
--- a/fodc/proxy/internal/api/server_test.go
+++ b/fodc/proxy/internal/api/server_test.go
@@ -833,3 +833,199 @@ func 
TestHandleClusterLifecycle_NoAgents_FlagsErrorInBody(t *testing.T) {
        assert.EqualValues(t, 0, summary["total"], "agent_summary.total should 
be 0 when no agents are registered")
        assert.EqualValues(t, 0, summary["responded"], "agent_summary.responded 
should be 0 when no agents are registered")
 }
+
+// runLifecycleHandlerWithGroup wires up a lifecycle manager backed by a 
single agent that
+// reports the supplied group, invokes the HTTP handler, and returns the 
parsed response
+// body so callers can inspect raw JSON keys.
+func runLifecycleHandlerWithGroup(t *testing.T, group 
*fodcv1.GroupLifecycleInfo) map[string]interface{} {
+       t.Helper()
+       initTestLogger(t)
+       testLogger := logger.GetLogger("test", "api")
+       testRegistry := registry.NewAgentRegistry(testLogger, 5*time.Second, 
10*time.Second, 100)
+       t.Cleanup(testRegistry.Stop)
+       mockSender := &mockRequestSender{}
+       aggregator := metrics.NewAggregator(testRegistry, mockSender, 
testLogger)
+
+       mockLifecycleRequester := &mockLifecycleDataRequester{
+               dataByAgent: make(map[string]*fodcv1.LifecycleData),
+               podByAgent:  make(map[string]string),
+       }
+       lifecycleMgr := lifecycle.NewManager(testRegistry, 
mockLifecycleRequester, testLogger)
+       mockLifecycleRequester.lifecycleMgr = lifecycleMgr
+
+       identity := registry.AgentIdentity{
+               PodName:        "test-pod-1",
+               Role:           "datanode",
+               ContainerNames: []string{"banyandb"},
+       }
+       agentID, registerErr := 
testRegistry.RegisterAgent(context.Background(), identity)
+       require.NoError(t, registerErr)
+
+       mockLifecycleRequester.dataByAgent[agentID] = &fodcv1.LifecycleData{
+               Groups: []*fodcv1.GroupLifecycleInfo{group},
+       }
+       mockLifecycleRequester.podByAgent[agentID] = "test-pod-1"
+
+       server := NewServer(aggregator, nil, lifecycleMgr, testRegistry, 
testLogger)
+       req := httptest.NewRequest(http.MethodGet, "/cluster/lifecycle", nil)
+       resp := httptest.NewRecorder()
+       server.handleClusterLifecycle(resp, req)
+       require.Equal(t, http.StatusOK, resp.Code)
+
+       var body map[string]interface{}
+       require.NoError(t, json.NewDecoder(resp.Body).Decode(&body))
+       return body
+}
+
+// firstGroup pulls the first group from the decoded response body and asserts 
the basic shape.
+func firstGroup(t *testing.T, body map[string]interface{}) 
map[string]interface{} {
+       t.Helper()
+       groups, ok := body["groups"].([]interface{})
+       require.True(t, ok, "response body must contain a groups array")
+       require.NotEmpty(t, groups, "groups array must not be empty")
+       g, ok := groups[0].(map[string]interface{})
+       require.True(t, ok, "first group must decode to an object")
+       return g
+}
+
+func TestHandleClusterLifecycle_UsesSnakeCase(t *testing.T) {
+       group := &fodcv1.GroupLifecycleInfo{
+               Name: "g1",
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 4,
+               },
+       }
+       body := runLifecycleHandlerWithGroup(t, group)
+       g := firstGroup(t, body)
+
+       resourceOpts, ok := g["resource_opts"].(map[string]interface{})
+       require.True(t, ok, "field name must be snake_case 'resource_opts', not 
'resourceOpts'")
+       assert.Equal(t, float64(4), resourceOpts["shard_num"], "field name must 
be snake_case 'shard_num'")
+       _, hasCamel := g["resourceOpts"]
+       assert.False(t, hasCamel, "camelCase key must not be emitted")
+}
+
+func TestHandleClusterLifecycle_EmitsZeroValueFields(t *testing.T) {
+       group := &fodcv1.GroupLifecycleInfo{
+               Name: "g1",
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 4,
+                       Replicas: 0, // zero value must still appear in JSON 
output
+                       Stages: []*commonv1.LifecycleStage{
+                               {
+                                       Name:     "warm",
+                                       ShardNum: 2,
+                                       Close:    false, // zero value must 
still appear
+                                       Replicas: 0,     // zero value must 
still appear
+                               },
+                       },
+               },
+       }
+       body := runLifecycleHandlerWithGroup(t, group)
+       g := firstGroup(t, body)
+
+       resourceOpts := g["resource_opts"].(map[string]interface{})
+       _, hasReplicas := resourceOpts["replicas"]
+       assert.True(t, hasReplicas, "zero-value replicas at ResourceOpts level 
must be emitted, not omitted")
+       assert.Equal(t, float64(0), resourceOpts["replicas"])
+
+       stages, ok := resourceOpts["stages"].([]interface{})
+       require.True(t, ok)
+       require.Len(t, stages, 1)
+       stage := stages[0].(map[string]interface{})
+
+       _, hasClose := stage["close"]
+       assert.True(t, hasClose, "zero-value close=false in LifecycleStage must 
be emitted")
+       assert.Equal(t, false, stage["close"])
+
+       _, hasStageReplicas := stage["replicas"]
+       assert.True(t, hasStageReplicas, "zero-value replicas in LifecycleStage 
must be emitted")
+       assert.Equal(t, float64(0), stage["replicas"])
+}
+
+func TestHandleClusterLifecycle_NilResourceOptsIsNull(t *testing.T) {
+       group := &fodcv1.GroupLifecycleInfo{
+               Name: "g1",
+               // ResourceOpts intentionally nil
+       }
+       body := runLifecycleHandlerWithGroup(t, group)
+       g := firstGroup(t, body)
+
+       require.Contains(t, g, "resource_opts", "EmitUnpopulated must surface 
the absent message as a key")
+       assert.Nil(t, g["resource_opts"], "nil protobuf message marshals to 
JSON null under EmitUnpopulated")
+}
+
+func TestHandleClusterLifecycle_DataInfoEmitted(t *testing.T) {
+       group := &fodcv1.GroupLifecycleInfo{
+               Name: "g1",
+               DataInfo: []*databasev1.DataInfo{
+                       {DataSizeBytes: 1024},
+               },
+       }
+       body := runLifecycleHandlerWithGroup(t, group)
+       g := firstGroup(t, body)
+
+       dataInfo, ok := g["data_info"].([]interface{})
+       require.True(t, ok, "data_info must be encoded as snake_case array")
+       require.Len(t, dataInfo, 1)
+}
+
+func TestHandleClusterLifecycle_StatusReportsUseProtoJSON(t *testing.T) {
+       initTestLogger(t)
+       testLogger := logger.GetLogger("test", "api")
+       testRegistry := registry.NewAgentRegistry(testLogger, 5*time.Second, 
10*time.Second, 100)
+       t.Cleanup(testRegistry.Stop)
+       mockSender := &mockRequestSender{}
+       aggregator := metrics.NewAggregator(testRegistry, mockSender, 
testLogger)
+
+       mockLifecycleRequester := &mockLifecycleDataRequester{
+               dataByAgent: make(map[string]*fodcv1.LifecycleData),
+               podByAgent:  make(map[string]string),
+       }
+       lifecycleMgr := lifecycle.NewManager(testRegistry, 
mockLifecycleRequester, testLogger)
+       mockLifecycleRequester.lifecycleMgr = lifecycleMgr
+
+       identity := registry.AgentIdentity{
+               PodName:        "test-pod-1",
+               Role:           "datanode",
+               ContainerNames: []string{"banyandb"},
+       }
+       agentID, registerErr := 
testRegistry.RegisterAgent(context.Background(), identity)
+       require.NoError(t, registerErr)
+
+       mockLifecycleRequester.dataByAgent[agentID] = &fodcv1.LifecycleData{
+               Reports: []*fodcv1.LifecycleReport{
+                       {Filename: "2026-04-29.json", ReportJson: 
`{"status":"ok"}`},
+                       {Filename: "2026-04-28.json", ReportJson: ""}, // empty 
value must still be emitted
+               },
+       }
+       mockLifecycleRequester.podByAgent[agentID] = "test-pod-1"
+
+       server := NewServer(aggregator, nil, lifecycleMgr, testRegistry, 
testLogger)
+       req := httptest.NewRequest(http.MethodGet, "/cluster/lifecycle", nil)
+       resp := httptest.NewRecorder()
+       server.handleClusterLifecycle(resp, req)
+       require.Equal(t, http.StatusOK, resp.Code)
+
+       var body map[string]interface{}
+       require.NoError(t, json.NewDecoder(resp.Body).Decode(&body))
+
+       statuses, ok := body["lifecycle_statuses"].([]interface{})
+       require.True(t, ok)
+       require.Len(t, statuses, 1)
+       status := statuses[0].(map[string]interface{})
+       assert.Equal(t, "test-pod-1", status["pod_name"])
+
+       reports, ok := status["reports"].([]interface{})
+       require.True(t, ok)
+       require.Len(t, reports, 2)
+
+       first := reports[0].(map[string]interface{})
+       assert.Equal(t, "2026-04-29.json", first["filename"])
+       assert.Equal(t, `{"status":"ok"}`, first["report_json"], "field must be 
snake_case 'report_json'")
+
+       second := reports[1].(map[string]interface{})
+       _, hasReportJSON := second["report_json"]
+       assert.True(t, hasReportJSON, "empty report_json must still be emitted 
under EmitUnpopulated")
+       assert.Equal(t, "", second["report_json"])
+}
diff --git a/fodc/proxy/internal/lifecycle/manager.go 
b/fodc/proxy/internal/lifecycle/manager.go
index 207354730..23c7bc9db 100644
--- a/fodc/proxy/internal/lifecycle/manager.go
+++ b/fodc/proxy/internal/lifecycle/manager.go
@@ -21,14 +21,19 @@ package lifecycle
 import (
        "context"
        "sync"
-       "time"
 
        fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/internal/timeouts"
        "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-const defaultCollectionTimeout = 10 * time.Second
+// defaultCollectionTimeout bounds how long the proxy waits for each agent to 
push back
+// its lifecycle data. It is derived from the agent-side InspectAll timeout 
plus a fixed
+// slack so that this deadline is strictly greater than the agent's own 
deadline; the
+// proxy must always outlast the agent and never give up while a 
still-progressing
+// InspectAll call is in flight on the agent side.
+const defaultCollectionTimeout = timeouts.AgentInspectAll + timeouts.ProxySlack
 
 // PodLifecycleStatus represents lifecycle status for a single pod.
 type PodLifecycleStatus struct {

Reply via email to