This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 64983ea0d Fix FODC agent leaving node_role unspecified on startup race
(#1157)
64983ea0d is described below
commit 64983ea0dc095afa58e40477ef044c710339ea9d
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Jun 5 16:01:34 2026 +0800
Fix FODC agent leaving node_role unspecified on startup race (#1157)
* Fix FODC agent leaving node_role unspecified on startup race
The agent resolved the node role exactly once at startup via a single
GetCurrentNode poll whose endpoint retries spanned only ~1s. When the
sibling lifecycle/banyandb gRPC server was not yet listening
(connect: cannot assign requested address), the role fell back to
ROLE_UNSPECIFIED permanently, so 6 of 7 nodes reported the wrong type
instead of ROLE_DATA/ROLE_LIAISON.
---
CHANGES.md | 1 +
fodc/agent/internal/cluster/collector.go | 48 +++++++++++++++++++++++++
fodc/agent/internal/cluster/collector_test.go | 50 +++++++++++++++++++++++++++
fodc/agent/internal/cmd/root.go | 2 +-
4 files changed, 100 insertions(+), 1 deletion(-)
diff --git a/CHANGES.md b/CHANGES.md
index 2c4c52f9a..d2c866352 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -106,6 +106,7 @@ Release Notes.
- Trace storage metrics now expose the `storage` sub-scope, matching the
`stream_storage_*` naming. The `StorageMetricsFactory` for trace switched from
the root `trace` scope to `trace.storage`, so per-segment inverted-index
metrics (`inverted_index_total_updates`, `inverted_index_total_doc_count`,
`inverted_index_total_term_searchers_started`) are now emitted as
`banyandb_trace_storage_*` instead of `banyandb_trace_*`, aligning the
dashboard query names. Other trace metrics (`trace_tst_ [...]
- Fix flaky measure snapshot tests that gated on the part directory appearing
in `tab/` as the flush-completion signal. That directory is created by the
first line of `memPart.mustFlush`, before the mem→file introduction reaches the
in-memory snapshot and before the `.snp` manifest is persisted, so under
`-race`/CI load `TakeFileSnapshot` could observe only mem parts and `Close`
could drop the in-flight flush; gate on the persisted `.snp` manifest instead.
- Fix FODC proxy corrupting Prometheus metric types. The agent dropped the `#
TYPE` line while parsing banyandb `/metrics`, the `StreamMetrics` proto carried
no type field, and the proxy guessed the type from a name-suffix heuristic —
downgrading counters to gauge, mislabeling `_count`-suffixed counters as
histograms, and splitting summaries into two conflicting `# TYPE` lines.
Capture the type with the Prometheus `expfmt` parser, store it in the flight
recorder, thread it through a new [...]
+- Fix FODC agent labeling metrics with `node_role="ROLE_UNSPECIFIED"`. The
agent resolved the node role exactly once at startup via a single
`GetCurrentNode` poll whose endpoint retries spanned only ~1s; when the sibling
lifecycle/banyandb gRPC server was not yet listening (`connect: cannot assign
requested address`) the role fell back to `ROLE_UNSPECIFIED` permanently, so
most nodes never reported their real `ROLE_DATA`/`ROLE_LIAISON`. Retry the
initial node-role resolution with exponen [...]
### Chores
diff --git a/fodc/agent/internal/cluster/collector.go
b/fodc/agent/internal/cluster/collector.go
index ba53c4e0d..42f60e3a0 100644
--- a/fodc/agent/internal/cluster/collector.go
+++ b/fodc/agent/internal/cluster/collector.go
@@ -39,6 +39,10 @@ const (
initialBackoff = 100 * time.Millisecond
maxBackoff = 5 * time.Second
nodeInfoFetchTimeout = 30 * time.Second
+ // nodeRoleResolveBudget bounds how long the initial node-role
resolution retries.
+ // Keep it below nodeInfoFetchTimeout so the poll finishes before
StartCollector's
+ // WaitForNodeFetched deadline.
+ nodeRoleResolveBudget = 25 * time.Second
)
// TopologyMap represents processed cluster data for a single endpoint.
@@ -66,6 +70,7 @@ type Collector struct {
podName string
addrs []string
interval time.Duration
+ resolveBudget time.Duration
mu sync.RWMutex
}
@@ -76,6 +81,7 @@ func NewCollector(log *logger.Logger, addrs []string,
interval time.Duration, po
addrs: addrs,
interval: interval,
podName: podName,
+ resolveBudget: nodeRoleResolveBudget,
closer: run.NewCloser(0),
nodeFetchedCh: make(chan struct{}),
clients: make([]*endpointClient, 0, len(addrs)),
@@ -259,6 +265,48 @@ func (c *Collector) updateCurrentNodes(nodes
map[string]*databasev1.Node) {
c.log.Info().Int("nodes_count", len(nodes)).Msg("Updated current nodes
from all endpoints")
}
+// ResolveNodeRole polls the current node until a resolved (non-unspecified)
role is
+// obtained or resolveBudget elapses, then returns the role string and labels.
All
+// per-attempt fetches share a budget-scoped context, so the total runtime is
bounded by
+// resolveBudget regardless of the per-attempt gRPC timeouts and backoffs.
This guards the
+// startup race where the sibling lifecycle/banyandb gRPC server is not yet
listening when
+// the agent first polls, which previously left node_role permanently
ROLE_UNSPECIFIED.
+// It does not touch nodeFetchedCh, so the collector's readiness signal stays
prompt.
+func (c *Collector) ResolveNodeRole(ctx context.Context) (nodeRole string,
nodeLabels map[string]string) {
+ budgetCtx, cancel := context.WithTimeout(ctx, c.resolveBudget)
+ defer cancel()
+ backoff := initialBackoff
+ for !c.hasResolvedRole() {
+ c.pollCurrentNode(budgetCtx)
+ if c.hasResolvedRole() || budgetCtx.Err() != nil ||
c.closer.Closed() {
+ break
+ }
+ select {
+ case <-budgetCtx.Done():
+ case <-c.closer.CloseNotify():
+ case <-time.After(backoff):
+ }
+ if budgetCtx.Err() != nil || c.closer.Closed() {
+ break
+ }
+ backoff = min(backoff*2, maxBackoff)
+ }
+ return c.GetNodeInfo()
+}
+
+// hasResolvedRole reports whether any current node has a known
(non-unspecified) role.
+func (c *Collector) hasResolvedRole() bool {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ unspecified :=
databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)]
+ for _, node := range c.currentNodes {
+ if node != nil && NodeRoleFromNode(node) != unspecified {
+ return true
+ }
+ }
+ return false
+}
+
func (c *Collector) updateClusterStates(states
map[string]*databasev1.GetClusterStateResponse) {
if len(states) == 0 {
c.log.Warn().Msg("Received empty cluster states map")
diff --git a/fodc/agent/internal/cluster/collector_test.go
b/fodc/agent/internal/cluster/collector_test.go
index 4b127ad56..71ab0850b 100644
--- a/fodc/agent/internal/cluster/collector_test.go
+++ b/fodc/agent/internal/cluster/collector_test.go
@@ -19,11 +19,13 @@ package cluster
import (
"context"
+ "fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -766,6 +768,54 @@ func TestProcessClusterStates_EmptyPodName(t *testing.T) {
}
}
+type fakeNodeQueryClient struct {
+ node *databasev1.Node
+ failCount int
+ calls int
+}
+
+func (f *fakeNodeQueryClient) GetCurrentNode(_ context.Context, _
*databasev1.GetCurrentNodeRequest,
+ _ ...grpc.CallOption,
+) (*databasev1.GetCurrentNodeResponse, error) {
+ f.calls++
+ if f.calls <= f.failCount {
+ return nil, fmt.Errorf("simulated unavailable on call %d",
f.calls)
+ }
+ return &databasev1.GetCurrentNodeResponse{Node: f.node}, nil
+}
+
+func TestCollector_ResolveNodeRole_ResolvesAfterTransientFailure(t *testing.T)
{
+ log := initTestLogger(t)
+ c := NewCollector(log, []string{"test"}, time.Second, "test-pod")
+ c.resolveBudget = 5 * time.Second
+ node := &databasev1.Node{
+ Metadata: &commonv1.Metadata{Name: "data-0"},
+ Roles: []databasev1.Role{databasev1.Role_ROLE_DATA},
+ }
+ // fetchCurrentNodeFromEndpoint itself retries maxRetries(=3) times per
poll, so
+ // failing the first 3 GetCurrentNode calls forces a second outer poll
iteration.
+ fake := &fakeNodeQueryClient{node: node, failCount: maxRetries}
+ c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}}
+ role, _ := c.ResolveNodeRole(context.Background())
+ assert.Equal(t, databasev1.Role_name[int32(databasev1.Role_ROLE_DATA)],
role)
+ assert.Greater(t, fake.calls, maxRetries)
+}
+
+func TestCollector_ResolveNodeRole_GivesUpWithinBudget(t *testing.T) {
+ log := initTestLogger(t)
+ c := NewCollector(log, []string{"test"}, time.Second, "test-pod")
+ c.resolveBudget = 500 * time.Millisecond
+ fake := &fakeNodeQueryClient{failCount: 1 << 30} // always fails
+ c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}}
+ start := time.Now()
+ role, _ := c.ResolveNodeRole(context.Background())
+ elapsed := time.Since(start)
+ assert.Equal(t,
databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)], role)
+ // Total runtime must stay bounded by the budget (plus slack for one
in-flight gRPC
+ // attempt), proving the budget-scoped context actually caps the retry
loop.
+ assert.Less(t, elapsed, c.resolveBudget+2*time.Second)
+}
+
func TestProcessClusterStates_ComplexTopology(t *testing.T) {
log := initTestLogger(t)
collector := NewCollector(log, []string{"localhost:17914",
"localhost:17915", "localhost:17916"}, 10*time.Second, "test-pod")
diff --git a/fodc/agent/internal/cmd/root.go b/fodc/agent/internal/cmd/root.go
index 3633a286c..9576e9427 100644
--- a/fodc/agent/internal/cmd/root.go
+++ b/fodc/agent/internal/cmd/root.go
@@ -273,7 +273,7 @@ func runFODC(_ *cobra.Command, _ []string) error {
var nodeRole string
var nodeLabels map[string]string
if clusterCollector != nil {
- nodeRole, nodeLabels = clusterCollector.GetNodeInfo()
+ nodeRole, nodeLabels = clusterCollector.ResolveNodeRole(ctx)
log.Info().Str("node_role", nodeRole).Int("labels_count",
len(nodeLabels)).Msg("Node info fetched")
}