hanahmily commented on code in PR #1157:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1157#discussion_r3359729446


##########
fodc/agent/internal/cluster/collector.go:
##########
@@ -259,6 +265,47 @@ func (c *Collector) updateCurrentNodes(nodes 
map[string]*databasev1.Node) {
        c.log.Info().Int("nodes_count", len(nodes)).Msg("Updated current nodes 
from all endpoints")
 }
 
+// pollCurrentNodeWithRetry repeatedly polls the current node until node info 
with a
+// resolved (non-unspecified) role is obtained, or the resolve budget elapses. 
The
+// sibling lifecycle/banyandb gRPC server may not be listening when the agent 
first
+// starts; the original single-shot poll left node_role permanently 
ROLE_UNSPECIFIED
+// whenever that brief startup race was lost.
+func (c *Collector) pollCurrentNodeWithRetry(ctx context.Context) {
+       deadline := time.Now().Add(c.resolveBudget)
+       backoff := initialBackoff
+       for {
+               c.pollCurrentNode(ctx)
+               if c.hasResolvedRole() {
+                       return
+               }
+               if ctx.Err() != nil || !time.Now().Before(deadline) {
+                       c.log.Warn().Msg("Node role unresolved after resolve 
budget; proceeding with current node info")
+                       return
+               }
+               select {
+               case <-ctx.Done():
+                       return
+               case <-c.closer.CloseNotify():
+                       return
+               case <-time.After(backoff):
+               }
+               backoff = min(backoff*2, maxBackoff)
+       }
+}

Review Comment:
   Good catch — fixed in 56b2e4d0. The retry no longer runs inside 
`collectLoop`, so it can't delay `nodeFetchedCh`/`WaitForNodeFetched`. It now 
lives in `Collector.ResolveNodeRole(ctx)`, which derives a 
`context.WithTimeout(ctx, resolveBudget)` and passes that budget-scoped context 
into every `pollCurrentNode`; the per-attempt gRPC timeouts and backoffs are 
therefore capped by the overall budget, so total runtime can't exceed 
`resolveBudget` (kept < the 30s `WaitForNodeFetched` timeout). `root.go` calls 
`ResolveNodeRole` before snapshotting the role for the watchdog/proxy.



##########
fodc/agent/internal/cluster/collector_test.go:
##########
@@ -766,6 +768,54 @@ func TestProcessClusterStates_EmptyPodName(t *testing.T) {
        }
 }
 
+type fakeNodeQueryClient struct {
+       node      *databasev1.Node
+       failCount int
+       calls     int
+}
+
+func (f *fakeNodeQueryClient) GetCurrentNode(_ context.Context, _ 
*databasev1.GetCurrentNodeRequest,
+       _ ...grpc.CallOption,
+) (*databasev1.GetCurrentNodeResponse, error) {
+       f.calls++
+       if f.calls <= f.failCount {
+               return nil, fmt.Errorf("simulated unavailable on call %d", 
f.calls)
+       }
+       return &databasev1.GetCurrentNodeResponse{Node: f.node}, nil
+}
+
+func TestCollector_PollCurrentNodeWithRetry_ResolvesAfterTransientFailure(t 
*testing.T) {
+       log := initTestLogger(t)
+       c := NewCollector(log, []string{"test"}, time.Second, "test-pod")
+       c.resolveBudget = 5 * time.Second
+       node := &databasev1.Node{
+               Metadata: &commonv1.Metadata{Name: "data-0"},
+               Roles:    []databasev1.Role{databasev1.Role_ROLE_DATA},
+       }
+       // fetchCurrentNodeFromEndpoint itself retries maxRetries(=3) times per 
poll, so
+       // failing the first 3 GetCurrentNode calls forces a second outer poll 
iteration.
+       fake := &fakeNodeQueryClient{node: node, failCount: maxRetries}
+       c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}}
+       c.pollCurrentNodeWithRetry(context.Background())
+       role, _ := c.GetNodeInfo()
+       assert.Equal(t, databasev1.Role_name[int32(databasev1.Role_ROLE_DATA)], 
role)
+       assert.Greater(t, fake.calls, maxRetries)
+}
+
+func TestCollector_PollCurrentNodeWithRetry_GivesUpAfterBudget(t *testing.T) {
+       log := initTestLogger(t)
+       c := NewCollector(log, []string{"test"}, time.Second, "test-pod")
+       c.resolveBudget = 200 * time.Millisecond
+       fake := &fakeNodeQueryClient{failCount: 1 << 30} // always fails
+       c.clients = []*endpointClient{{nodeQueryClient: fake, addr: "test"}}
+       start := time.Now()
+       c.pollCurrentNodeWithRetry(context.Background())
+       elapsed := time.Since(start)
+       role, _ := c.GetNodeInfo()
+       assert.Equal(t, 
databasev1.Role_name[int32(databasev1.Role_ROLE_UNSPECIFIED)], role)
+       assert.Less(t, elapsed, 10*time.Second) // returns promptly, does not 
hang
+}

Review Comment:
   Done in 56b2e4d0 — the test (renamed 
`TestCollector_ResolveNodeRole_GivesUpWithinBudget`) now asserts `elapsed < 
c.resolveBudget + 2*time.Second` instead of a flat 10s, so a regression that 
ignores the budget is caught.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to