This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 64983ea0d Fix FODC agent leaving node_role unspecified on startup race 
(#1157)
64983ea0d is described below

commit 64983ea0dc095afa58e40477ef044c710339ea9d
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jun 5 16:01:34 2026 +0800

    Fix FODC agent leaving node_role unspecified on startup race (#1157)
    
    * Fix FODC agent leaving node_role unspecified on startup race
    
    The agent resolved the node role exactly once at startup via a single
    GetCurrentNode poll whose endpoint retries spanned only ~1s. When the
    sibling lifecycle/banyandb gRPC server was not yet listening
    (connect: cannot assign requested address), the role fell back to
    ROLE_UNSPECIFIED permanently, so 6 of 7 nodes reported the wrong type
    instead of ROLE_DATA/ROLE_LIAISON.
---
 CHANGES.md                                    |  1 +
 fodc/agent/internal/cluster/collector.go      | 48 +++++++++++++++++++++++++
 fodc/agent/internal/cluster/collector_test.go | 50 +++++++++++++++++++++++++++
 fodc/agent/internal/cmd/root.go               |  2 +-
 4 files changed, 100 insertions(+), 1 deletion(-)

diff --git a/CHANGES.md b/CHANGES.md
index 2c4c52f9a..d2c866352 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -106,6 +106,7 @@ Release Notes.
 - Trace storage metrics now expose the `storage` sub-scope, matching the 
`stream_storage_*` naming. The `StorageMetricsFactory` for trace switched from 
the root `trace` scope to `trace.storage`, so per-segment inverted-index 
metrics (`inverted_index_total_updates`, `inverted_index_total_doc_count`, 
`inverted_index_total_term_searchers_started`) are now emitted as 
`banyandb_trace_storage_*` instead of `banyandb_trace_*`, aligning the 
dashboard query names. Other trace metrics (`trace_tst_ [...]
 - Fix flaky measure snapshot tests that gated on the part directory appearing 
in `tab/` as the flush-completion signal. That directory is created by the 
first line of `memPart.mustFlush`, before the mem→file introduction reaches the 
in-memory snapshot and before the `.snp` manifest is persisted, so under 
`-race`/CI load `TakeFileSnapshot` could observe only mem parts and `Close` 
could drop the in-flight flush; gate on the persisted `.snp` manifest instead.
 - Fix FODC proxy corrupting Prometheus metric types. The agent dropped the `# 
TYPE` line while parsing banyandb `/metrics`, the `StreamMetrics` proto carried 
no type field, and the proxy guessed the type from a name-suffix heuristic — 
downgrading counters to gauge, mislabeling `_count`-suffixed counters as 
histograms, and splitting summaries into two conflicting `# TYPE` lines. 
Capture the type with the Prometheus `expfmt` parser, store it in the flight 
recorder, thread it through a new  [...]
+- Fix FODC agent labeling metrics with `node_role="ROLE_UNSPECIFIED"`. The 
agent resolved the node role exactly once at startup via a single 
`GetCurrentNode` poll whose endpoint retries spanned only ~1s; when the sibling 
lifecycle/banyandb gRPC server was not yet listening (`connect: cannot assign 
requested address`) the role fell back to `ROLE_UNSPECIFIED` permanently, so 
most nodes never reported their real `ROLE_DATA`/`ROLE_LIAISON`. Retry the 
initial node-role resolution with exponen [...]
 
 ### Chores
 
diff --git a/fodc/agent/internal/cluster/collector.go 
b/fodc/agent/internal/cluster/collector.go
index ba53c4e0d..42f60e3a0 100644
--- a/fodc/agent/internal/cluster/collector.go
+++ b/fodc/agent/internal/cluster/collector.go
@@ -39,6 +39,10 @@ const (
        initialBackoff       = 100 * time.Millisecond
        maxBackoff           = 5 * time.Second
        nodeInfoFetchTimeout = 30 * time.Second
+       // nodeRoleResolveBudget bounds how long the initial node-role 
resolution retries.
+       // Keep it below nodeInfoFetchTimeout so the poll finishes before 
StartCollector's
+       // WaitForNodeFetched deadline.
+       nodeRoleResolveBudget = 25 * time.Second
 )
 
 // TopologyMap represents processed cluster data for a single endpoint.
@@ -66,6 +70,7 @@ type Collector struct {
        podName         string
        addrs           []string
        interval        time.Duration
+       resolveBudget   time.Duration
        mu              sync.RWMutex
 }
 
@@ -76,6 +81,7 @@ func NewCollector(log *logger.Logger, addrs []string, 
interval time.Duration, po
                addrs:           addrs,
                interval:        interval,
                podName:         podName,
+               resolveBudget:   nodeRoleResolveBudget,
                closer:          run.NewCloser(0),
                nodeFetchedCh:   make(chan struct{}),
                clients:         make([]*endpointClient, 0, len(addrs)),
@@ -259,6 +265,48 @@ func (c *Collector) updateCurrentNodes(nodes 
map[string]*databasev1.Node) {
        c.log.Info().Int("nodes_count", len(nodes)).Msg("Updated current nodes 
from all endpoints")
 }
 
+// ResolveNodeRole polls the current node until a resolved (non-unspecified) 
role is
+// obtained or resolveBudget elapses, then returns the role string and labels. 
All
+// per-attempt fetches share a budget-scoped context, so the total runtime is 
bounded by
+// resolveBudget regardless of the per-attempt gRPC timeouts and backoffs. 
This guards the
+// startup race where the sibling lifecycle/banyandb gRPC server is not yet 
listening when
+// the agent first polls, which previously left node_role permanently 
ROLE_UNSPECIFIED.
+// It does not touch nodeFetchedCh, so the collector's readiness signal stays 
prompt.
+func (c *Collector) ResolveNodeRole(ctx context.Context) (nodeRole string, 
nodeLabels map[string]string) {
+       budgetCtx, cancel := context.WithTimeout(ctx, c.resolveBudget)
+       defer cancel()
+       backoff := initialBackoff
+       for !c.hasResolvedRole() {
+               c.pollCurrentNode(budgetCtx)
+               if c.hasResolvedRole() || budgetCtx.Err() != nil || 
c.closer.Closed() {
+                       break
+               }
+               select {
+               case <-budgetCtx.Done():
+               case <-c.closer.CloseNotify():
+               case <-time.After(backoff):
+               }
+               if budgetCtx.Err() != nil || c.closer.Closed() {
+                       break
+               }
+               backoff = min(backoff*2, maxBackoff)
+       }
+       return c.GetNodeInfo()
+}
+
+// hasResolvedRole reports whether any current node has a known 
(non-unspecified) role.
+func (c *Collector) hasResolvedRole() bool {
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       unspecified := 
databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)]
+       for _, node := range c.currentNodes {
+               if node != nil && NodeRoleFromNode(node) != unspecified {
+                       return true
+               }
+       }
+       return false
+}
+
 func (c *Collector) updateClusterStates(states 
map[string]*databasev1.GetClusterStateResponse) {
        if len(states) == 0 {
                c.log.Warn().Msg("Received empty cluster states map")
diff --git a/fodc/agent/internal/cluster/collector_test.go 
b/fodc/agent/internal/cluster/collector_test.go
index 4b127ad56..71ab0850b 100644
--- a/fodc/agent/internal/cluster/collector_test.go
+++ b/fodc/agent/internal/cluster/collector_test.go
@@ -19,11 +19,13 @@ package cluster
 
 import (
        "context"
+       "fmt"
        "testing"
        "time"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -766,6 +768,54 @@ func TestProcessClusterStates_EmptyPodName(t *testing.T) {
        }
 }
 
+type fakeNodeQueryClient struct {
+       node      *databasev1.Node
+       failCount int
+       calls     int
+}
+
+func (f *fakeNodeQueryClient) GetCurrentNode(_ context.Context, _ 
*databasev1.GetCurrentNodeRequest,
+       _ ...grpc.CallOption,
+) (*databasev1.GetCurrentNodeResponse, error) {
+       f.calls++
+       if f.calls <= f.failCount {
+               return nil, fmt.Errorf("simulated unavailable on call %d", 
f.calls)
+       }
+       return &databasev1.GetCurrentNodeResponse{Node: f.node}, nil
+}
+
+func TestCollector_ResolveNodeRole_ResolvesAfterTransientFailure(t *testing.T) 
{
+       log := initTestLogger(t)
+       c := NewCollector(log, []string{"test"}, time.Second, "test-pod")
+       c.resolveBudget = 5 * time.Second
+       node := &databasev1.Node{
+               Metadata: &commonv1.Metadata{Name: "data-0"},
+               Roles:    []databasev1.Role{databasev1.Role_ROLE_DATA},
+       }
+       // fetchCurrentNodeFromEndpoint itself retries maxRetries(=3) times per 
poll, so
+       // failing the first 3 GetCurrentNode calls forces a second outer poll 
iteration.
+       fake := &fakeNodeQueryClient{node: node, failCount: maxRetries}
+       c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}}
+       role, _ := c.ResolveNodeRole(context.Background())
+       assert.Equal(t, databasev1.Role_name[int32(databasev1.Role_ROLE_DATA)], 
role)
+       assert.Greater(t, fake.calls, maxRetries)
+}
+
+func TestCollector_ResolveNodeRole_GivesUpWithinBudget(t *testing.T) {
+       log := initTestLogger(t)
+       c := NewCollector(log, []string{"test"}, time.Second, "test-pod")
+       c.resolveBudget = 500 * time.Millisecond
+       fake := &fakeNodeQueryClient{failCount: 1 << 30} // always fails
+       c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}}
+       start := time.Now()
+       role, _ := c.ResolveNodeRole(context.Background())
+       elapsed := time.Since(start)
+       assert.Equal(t, 
databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)], role)
+       // Total runtime must stay bounded by the budget (plus slack for one 
in-flight gRPC
+       // attempt), proving the budget-scoped context actually caps the retry 
loop.
+       assert.Less(t, elapsed, c.resolveBudget+2*time.Second)
+}
+
 func TestProcessClusterStates_ComplexTopology(t *testing.T) {
        log := initTestLogger(t)
        collector := NewCollector(log, []string{"localhost:17914", 
"localhost:17915", "localhost:17916"}, 10*time.Second, "test-pod")
diff --git a/fodc/agent/internal/cmd/root.go b/fodc/agent/internal/cmd/root.go
index 3633a286c..9576e9427 100644
--- a/fodc/agent/internal/cmd/root.go
+++ b/fodc/agent/internal/cmd/root.go
@@ -273,7 +273,7 @@ func runFODC(_ *cobra.Command, _ []string) error {
        var nodeRole string
        var nodeLabels map[string]string
        if clusterCollector != nil {
-               nodeRole, nodeLabels = clusterCollector.GetNodeInfo()
+               nodeRole, nodeLabels = clusterCollector.ResolveNodeRole(ctx)
                log.Info().Str("node_role", nodeRole).Int("labels_count", 
len(nodeLabels)).Msg("Node info fetched")
        }
 

Reply via email to