This is an automated email from the ASF dual-hosted git repository. ButterBright pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5fa118632e5da4debd2e3628db30f542ea4ea935 Author: Gao Hongtao <[email protected]> AuthorDate: Fri Jun 5 16:01:34 2026 +0800 Fix FODC agent leaving node_role unspecified on startup race (#1157) * Fix FODC agent leaving node_role unspecified on startup race The agent resolved the node role exactly once at startup via a single GetCurrentNode poll whose endpoint retries spanned only ~1s. When the sibling lifecycle/banyandb gRPC server was not yet listening (connect: cannot assign requested address), the role fell back to ROLE_UNSPECIFIED permanently, so 6 of 7 nodes reported the wrong type instead of ROLE_DATA/ROLE_LIAISON. --- CHANGES.md | 1 + fodc/agent/internal/cluster/collector.go | 48 +++++++++++++++++++++++++ fodc/agent/internal/cluster/collector_test.go | 50 +++++++++++++++++++++++++++ fodc/agent/internal/cmd/root.go | 2 +- 4 files changed, 100 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index c752a8c6b..c6d75b767 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ Release Notes. - Fix trace query identity-tag projection: when `trace_id`/`span_id` are explicitly projected, reconstruct them from span identity at response build time instead of requesting them as stored tags, and preserve tag order with null-filled per-span value alignment in the distributed trace result iterator. - Fix FODC proxy corrupting Prometheus metric types. The agent dropped the `# TYPE` line while parsing banyandb `/metrics`, the `StreamMetrics` proto carried no type field, and the proxy guessed the type from a name-suffix heuristic — downgrading counters to gauge, mislabeling `_count`-suffixed counters as histograms, and splitting summaries into two conflicting `# TYPE` lines. Capture the type with the Prometheus `expfmt` parser, store it in the flight recorder, thread it through a new [...] - Trace storage metrics now expose the `storage` sub-scope, matching the `stream_storage_*` naming. The `StorageMetricsFactory` for trace switched from the root `trace` scope to `trace.storage`, so per-segment inverted-index metrics (`inverted_index_total_updates`, `inverted_index_total_doc_count`, `inverted_index_total_term_searchers_started`) are now emitted as `banyandb_trace_storage_*` instead of `banyandb_trace_*`, aligning the dashboard query names. Other trace metrics (`trace_tst_ [...] +- Fix FODC agent labeling metrics with `node_role="ROLE_UNSPECIFIED"`. The agent resolved the node role exactly once at startup via a single `GetCurrentNode` poll whose endpoint retries spanned only ~1s; when the sibling lifecycle/banyandb gRPC server was not yet listening (`connect: cannot assign requested address`) the role fell back to `ROLE_UNSPECIFIED` permanently, so most nodes never reported their real `ROLE_DATA`/`ROLE_LIAISON`. Retry the initial node-role resolution with exponen [...] ## 0.10.2 diff --git a/fodc/agent/internal/cluster/collector.go b/fodc/agent/internal/cluster/collector.go index ba53c4e0d..42f60e3a0 100644 --- a/fodc/agent/internal/cluster/collector.go +++ b/fodc/agent/internal/cluster/collector.go @@ -39,6 +39,10 @@ const ( initialBackoff = 100 * time.Millisecond maxBackoff = 5 * time.Second nodeInfoFetchTimeout = 30 * time.Second + // nodeRoleResolveBudget bounds how long the initial node-role resolution retries. + // Keep it below nodeInfoFetchTimeout so the poll finishes before StartCollector's + // WaitForNodeFetched deadline. + nodeRoleResolveBudget = 25 * time.Second ) // TopologyMap represents processed cluster data for a single endpoint. @@ -66,6 +70,7 @@ type Collector struct { podName string addrs []string interval time.Duration + resolveBudget time.Duration mu sync.RWMutex } @@ -76,6 +81,7 @@ func NewCollector(log *logger.Logger, addrs []string, interval time.Duration, po addrs: addrs, interval: interval, podName: podName, + resolveBudget: nodeRoleResolveBudget, closer: run.NewCloser(0), nodeFetchedCh: make(chan struct{}), clients: make([]*endpointClient, 0, len(addrs)), @@ -259,6 +265,48 @@ func (c *Collector) updateCurrentNodes(nodes map[string]*databasev1.Node) { c.log.Info().Int("nodes_count", len(nodes)).Msg("Updated current nodes from all endpoints") } +// ResolveNodeRole polls the current node until a resolved (non-unspecified) role is +// obtained or resolveBudget elapses, then returns the role string and labels. All +// per-attempt fetches share a budget-scoped context, so the total runtime is bounded by +// resolveBudget regardless of the per-attempt gRPC timeouts and backoffs. This guards the +// startup race where the sibling lifecycle/banyandb gRPC server is not yet listening when +// the agent first polls, which previously left node_role permanently ROLE_UNSPECIFIED. +// It does not touch nodeFetchedCh, so the collector's readiness signal stays prompt. +func (c *Collector) ResolveNodeRole(ctx context.Context) (nodeRole string, nodeLabels map[string]string) { + budgetCtx, cancel := context.WithTimeout(ctx, c.resolveBudget) + defer cancel() + backoff := initialBackoff + for !c.hasResolvedRole() { + c.pollCurrentNode(budgetCtx) + if c.hasResolvedRole() || budgetCtx.Err() != nil || c.closer.Closed() { + break + } + select { + case <-budgetCtx.Done(): + case <-c.closer.CloseNotify(): + case <-time.After(backoff): + } + if budgetCtx.Err() != nil || c.closer.Closed() { + break + } + backoff = min(backoff*2, maxBackoff) + } + return c.GetNodeInfo() +} + +// hasResolvedRole reports whether any current node has a known (non-unspecified) role. +func (c *Collector) hasResolvedRole() bool { + c.mu.RLock() + defer c.mu.RUnlock() + unspecified := databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)] + for _, node := range c.currentNodes { + if node != nil && NodeRoleFromNode(node) != unspecified { + return true + } + } + return false +} + func (c *Collector) updateClusterStates(states map[string]*databasev1.GetClusterStateResponse) { if len(states) == 0 { c.log.Warn().Msg("Received empty cluster states map") diff --git a/fodc/agent/internal/cluster/collector_test.go b/fodc/agent/internal/cluster/collector_test.go index 4b127ad56..71ab0850b 100644 --- a/fodc/agent/internal/cluster/collector_test.go +++ b/fodc/agent/internal/cluster/collector_test.go @@ -19,11 +19,13 @@ package cluster import ( "context" + "fmt" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -766,6 +768,54 @@ func TestProcessClusterStates_EmptyPodName(t *testing.T) { } } +type fakeNodeQueryClient struct { + node *databasev1.Node + failCount int + calls int +} + +func (f *fakeNodeQueryClient) GetCurrentNode(_ context.Context, _ *databasev1.GetCurrentNodeRequest, + _ ...grpc.CallOption, +) (*databasev1.GetCurrentNodeResponse, error) { + f.calls++ + if f.calls <= f.failCount { + return nil, fmt.Errorf("simulated unavailable on call %d", f.calls) + } + return &databasev1.GetCurrentNodeResponse{Node: f.node}, nil +} + +func TestCollector_ResolveNodeRole_ResolvesAfterTransientFailure(t *testing.T) { + log := initTestLogger(t) + c := NewCollector(log, []string{"test"}, time.Second, "test-pod") + c.resolveBudget = 5 * time.Second + node := &databasev1.Node{ + Metadata: &commonv1.Metadata{Name: "data-0"}, + Roles: []databasev1.Role{databasev1.Role_ROLE_DATA}, + } + // fetchCurrentNodeFromEndpoint itself retries maxRetries(=3) times per poll, so + // failing the first 3 GetCurrentNode calls forces a second outer poll iteration. + fake := &fakeNodeQueryClient{node: node, failCount: maxRetries} + c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}} + role, _ := c.ResolveNodeRole(context.Background()) + assert.Equal(t, databasev1.Role_name[int32(databasev1.Role_ROLE_DATA)], role) + assert.Greater(t, fake.calls, maxRetries) +} + +func TestCollector_ResolveNodeRole_GivesUpWithinBudget(t *testing.T) { + log := initTestLogger(t) + c := NewCollector(log, []string{"test"}, time.Second, "test-pod") + c.resolveBudget = 500 * time.Millisecond + fake := &fakeNodeQueryClient{failCount: 1 << 30} // always fails + c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}} + start := time.Now() + role, _ := c.ResolveNodeRole(context.Background()) + elapsed := time.Since(start) + assert.Equal(t, databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)], role) + // Total runtime must stay bounded by the budget (plus slack for one in-flight gRPC + // attempt), proving the budget-scoped context actually caps the retry loop. + assert.Less(t, elapsed, c.resolveBudget+2*time.Second) +} + func TestProcessClusterStates_ComplexTopology(t *testing.T) { log := initTestLogger(t) collector := NewCollector(log, []string{"localhost:17914", "localhost:17915", "localhost:17916"}, 10*time.Second, "test-pod") diff --git a/fodc/agent/internal/cmd/root.go b/fodc/agent/internal/cmd/root.go index d0ded635b..3070b9dc3 100644 --- a/fodc/agent/internal/cmd/root.go +++ b/fodc/agent/internal/cmd/root.go @@ -223,7 +223,7 @@ func runFODC(_ *cobra.Command, _ []string) error { var nodeRole string var nodeLabels map[string]string if clusterCollector != nil { - nodeRole, nodeLabels = clusterCollector.GetNodeInfo() + nodeRole, nodeLabels = clusterCollector.ResolveNodeRole(ctx) log.Info().Str("node_role", nodeRole).Int("labels_count", len(nodeLabels)).Msg("Node info fetched") }
