hanahmily commented on code in PR #1157:
URL:
https://github.com/apache/skywalking-banyandb/pull/1157#discussion_r3359729446
##########
fodc/agent/internal/cluster/collector.go:
##########
@@ -259,6 +265,47 @@ func (c *Collector) updateCurrentNodes(nodes
map[string]*databasev1.Node) {
c.log.Info().Int("nodes_count", len(nodes)).Msg("Updated current nodes
from all endpoints")
}
+// pollCurrentNodeWithRetry repeatedly polls the current node until node info
with a
+// resolved (non-unspecified) role is obtained, or the resolve budget elapses.
The
+// sibling lifecycle/banyandb gRPC server may not be listening when the agent
first
+// starts; the original single-shot poll left node_role permanently
ROLE_UNSPECIFIED
+// whenever that brief startup race was lost.
+func (c *Collector) pollCurrentNodeWithRetry(ctx context.Context) {
+ deadline := time.Now().Add(c.resolveBudget)
+ backoff := initialBackoff
+ for {
+ c.pollCurrentNode(ctx)
+ if c.hasResolvedRole() {
+ return
+ }
+ if ctx.Err() != nil || !time.Now().Before(deadline) {
+ c.log.Warn().Msg("Node role unresolved after resolve
budget; proceeding with current node info")
+ return
+ }
+ select {
+ case <-ctx.Done():
+ return
+ case <-c.closer.CloseNotify():
+ return
+ case <-time.After(backoff):
+ }
+ backoff = min(backoff*2, maxBackoff)
+ }
+}
Review Comment:
Good catch — fixed in 56b2e4d0. The retry no longer runs inside
`collectLoop`, so it can't delay `nodeFetchedCh`/`WaitForNodeFetched`. It now
lives in `Collector.ResolveNodeRole(ctx)`, which derives a
`context.WithTimeout(ctx, resolveBudget)` and passes that budget-scoped context
into every `pollCurrentNode`; the per-attempt gRPC timeouts and backoffs are
therefore capped by the overall budget, so total runtime can't exceed
`resolveBudget` (kept < the 30s `WaitForNodeFetched` timeout). `root.go` calls
`ResolveNodeRole` before snapshotting the role for the watchdog/proxy.
##########
fodc/agent/internal/cluster/collector_test.go:
##########
@@ -766,6 +768,54 @@ func TestProcessClusterStates_EmptyPodName(t *testing.T) {
}
}
+type fakeNodeQueryClient struct {
+ node *databasev1.Node
+ failCount int
+ calls int
+}
+
+func (f *fakeNodeQueryClient) GetCurrentNode(_ context.Context, _
*databasev1.GetCurrentNodeRequest,
+ _ ...grpc.CallOption,
+) (*databasev1.GetCurrentNodeResponse, error) {
+ f.calls++
+ if f.calls <= f.failCount {
+ return nil, fmt.Errorf("simulated unavailable on call %d",
f.calls)
+ }
+ return &databasev1.GetCurrentNodeResponse{Node: f.node}, nil
+}
+
+func TestCollector_PollCurrentNodeWithRetry_ResolvesAfterTransientFailure(t
*testing.T) {
+ log := initTestLogger(t)
+ c := NewCollector(log, []string{"test"}, time.Second, "test-pod")
+ c.resolveBudget = 5 * time.Second
+ node := &databasev1.Node{
+ Metadata: &commonv1.Metadata{Name: "data-0"},
+ Roles: []databasev1.Role{databasev1.Role_ROLE_DATA},
+ }
+ // fetchCurrentNodeFromEndpoint itself retries maxRetries(=3) times per
poll, so
+ // failing the first 3 GetCurrentNode calls forces a second outer poll
iteration.
+ fake := &fakeNodeQueryClient{node: node, failCount: maxRetries}
+ c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}}
+ c.pollCurrentNodeWithRetry(context.Background())
+ role, _ := c.GetNodeInfo()
+ assert.Equal(t, databasev1.Role_name[int32(databasev1.Role_ROLE_DATA)],
role)
+ assert.Greater(t, fake.calls, maxRetries)
+}
+
+func TestCollector_PollCurrentNodeWithRetry_GivesUpAfterBudget(t *testing.T) {
+ log := initTestLogger(t)
+ c := NewCollector(log, []string{"test"}, time.Second, "test-pod")
+ c.resolveBudget = 200 * time.Millisecond
+ fake := &fakeNodeQueryClient{failCount: 1 << 30} // always fails
+ c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}}
+ start := time.Now()
+ c.pollCurrentNodeWithRetry(context.Background())
+ elapsed := time.Since(start)
+ role, _ := c.GetNodeInfo()
+ assert.Equal(t,
databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)], role)
+ assert.Less(t, elapsed, 10*time.Second) // returns promptly, does not
hang
+}
Review Comment:
Done in 56b2e4d0 — the test (renamed
`TestCollector_ResolveNodeRole_GivesUpWithinBudget`) now asserts `elapsed <
c.resolveBudget + 2*time.Second` instead of a flat 10s, so a regression that
ignores the budget is caught.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]