This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 9b083ecc9ef56cc28ed5ceccd11e67620e962b38
Author: Hongtao Gao <[email protected]>
AuthorDate: Tue May 5 09:39:15 2026 +0000

    fix(barrier): post-probe membership refresh + chunking coverage (Copilot 
CLI review)
    
    Address findings from a `copilot --model gpt-5.4` review of the cumulative
    phase-2-cp5-march branch:
    
    - HIGH: race window between probe completion and timeout/cancel return.
      Each iteration of the three awaitXxxCluster paths snapshotted membership
      ONCE at the top, then built the timeout/cancel response from lastResults
      without a final refresh. A member that flipped Active → Evictable after
      probeMembers / probeKeyRevisions / probeAbsentKeys finished but before
      the timeout branch fired was reported as a normal revision / missing-key
      / still-present laggard instead of as a one-time `evicted_during_poll`
      laggard, and if it was the last blocked member the call returned
      applied=false when applied=true was correct.
    
      Fix: after every unsuccessful probe pass, re-run currentMembership() +
      applyTransitions / applyTransitionsApplied(), append any newly-detected
      eviction laggards, prune lastResults to surviving members via the new
      pruneRevResults / pruneKeyResults / pruneStillPresentResults helpers,
      then re-check the all-ready predicate (and the len(alive)==0 early-exit
      for the rev path) before falling through to the timeout/cancel branch.
    
    - HIGH: unverified `GetRouteTable()` copy contract.
      Comment in currentMembership claimed "the connection-pool layer
      guarantees that GetRouteTable() returns a copy" without naming the
      proving site. Verified at pkg/grpchelper/connmanager.go:308-333:
      GetRouteTable holds the manager's RLock and returns a freshly allocated
      *databasev1.RouteTable whose Active/Evictable slices are built per call.
      Updated the docstring to cite the line so future readers don't have to
      re-derive the contract.
    
    - MEDIUM: missing test coverage.
      - TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue: every frozen
        member transitions to Evictable mid-call → applied=true with two
        eviction laggards. Locks the post-probe `len(alive)==0` early-exit.
      - TestAwaitSchemaApplied_ChunkingBoundaries: parameterised over key
        counts {999, 1000, 1001, 2000} verifying GetKeyRevisions call count
        is the expected per-iteration chunk count and aggregated missing_keys
        has every key exactly once. Pins the probeKeysOne fencepost.
      - TestFanOut_AllPeersUnimplemented_AllReady: multi-peer all-Unimplemented
        case (the existing _NodeReturnsUnimplemented_TreatedAsReady covers
        only one peer); regression-locks the cross-version policy across the
        entire fan-out.
      - TestFanOut_NilRouteTable_FallsBackToSelf: nil route tables on both
        tiers + self present → applied=true via in-process self probe.
      - TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable: nil route tables
        with no self → codes.Unavailable (fail-fast).
      - TestFanOut_MemberMissingFromStatusClients: member listed in tier
        Active but absent from statusClients map → transient laggard for
        each iteration → timeout response carries it as a laggard.
    
    - minor: simplify two `if chunkEnd > len(keys) { chunkEnd = len(keys) }`
      blocks to `chunkEnd := min(chunkStart + barrierKeyChunkSize, len(keys))`
      per the govet minmax suggestion (Go 1.21+ builtin).
    
    Lint clean; existing 30 cluster-fan-out tests stay green; 8 new tests
    land green on first run.
    
    Open items NOT addressed in this commit (left as discussion points for
    CP-5 reviewers):
    - Route-table read amplification under slow-convergence timeouts (≤72
      GetRouteTable calls per timed-out 5s call); deferred to Step 2.7's
      metrics phase to first measure rather than cache speculatively.
    - The two #1111 commits (`3588e39c`, `3f7a6c75`) at the bottom of this
      branch are still in the in-flight per-step PR; this branch will
      rebase once #1111 merges.
    
    via [HAPI](https://hapi.run)
---
 banyand/liaison/grpc/barrier_cluster.go      | 101 +++++++++++++--
 banyand/liaison/grpc/barrier_cluster_test.go | 181 +++++++++++++++++++++++++++
 2 files changed, 272 insertions(+), 10 deletions(-)

diff --git a/banyand/liaison/grpc/barrier_cluster.go 
b/banyand/liaison/grpc/barrier_cluster.go
index 6370fbc92..e4d1edb1b 100644
--- a/banyand/liaison/grpc/barrier_cluster.go
+++ b/banyand/liaison/grpc/barrier_cluster.go
@@ -143,8 +143,11 @@ type membershipView struct {
 // currentMembership reads tier1 and tier2 route tables and merges their
 // Active and Evictable lists into name sets. Both providers contribute to
 // both sets — dedup is by name, so a hybrid host appearing in both tiers
-// counts once. The connection-pool layer guarantees that GetRouteTable()
-// returns a copy, so this read is safe to call concurrently without locking.
+// counts once. ConnManager.GetRouteTable (pkg/grpchelper/connmanager.go:308)
+// holds the manager's RLock for the duration of the read and returns a
+// freshly allocated *databasev1.RouteTable whose Active/Evictable slices
+// are built per call, so concurrent invocation from the barrier loop is
+// race-free without additional locking at this layer.
 func (b *barrierService) currentMembership() membershipView {
        view := membershipView{
                active:    map[string]struct{}{},
@@ -174,6 +177,53 @@ func (b *barrierService) currentMembership() 
membershipView {
        return view
 }
 
+// aliveSet returns a name-keyed set of the supplied alive slice for O(1)
+// post-probe filtering.
+func aliveSet(alive []member) map[string]struct{} {
+       out := make(map[string]struct{}, len(alive))
+       for _, m := range alive {
+               out[m.name] = struct{}{}
+       }
+       return out
+}
+
+// pruneRevResults filters probeResult slice to only entries whose member
+// is still in the alive set. Used by the post-probe membership refresh in
+// awaitRevisionAppliedCluster to avoid reporting an in-flight evicted
+// member as a normal revision laggard on the timeout/cancel return path.
+func pruneRevResults(results []probeResult, set map[string]struct{}) 
[]probeResult {
+       out := results[:0]
+       for _, r := range results {
+               if _, ok := set[r.member.name]; ok {
+                       out = append(out, r)
+               }
+       }
+       return out
+}
+
+// pruneKeyResults is the keyProbeResult sibling of pruneRevResults.
+func pruneKeyResults(results []keyProbeResult, set map[string]struct{}) 
[]keyProbeResult {
+       out := results[:0]
+       for _, r := range results {
+               if _, ok := set[r.member.name]; ok {
+                       out = append(out, r)
+               }
+       }
+       return out
+}
+
+// pruneStillPresentResults is the stillPresentResult sibling of
+// pruneRevResults.
+func pruneStillPresentResults(results []stillPresentResult, set 
map[string]struct{}) []stillPresentResult {
+       out := results[:0]
+       for _, r := range results {
+               if _, ok := set[r.member.name]; ok {
+                       out = append(out, r)
+               }
+       }
+       return out
+}
+
 // applyTransitions walks the alive members against the current membership
 // view and partitions them into "still alive" (kept for the next probe) and
 // "newly evicted" (surfaced as one-time laggards with 
reason="evicted_during_poll").
@@ -248,6 +298,19 @@ func (b *barrierService) awaitRevisionAppliedCluster(ctx 
context.Context, req *s
                if allReady(lastResults) {
                        return &schemav1.AwaitRevisionAppliedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
                }
+               // Post-probe membership refresh: a member can transition 
Active →
+               // Evictable between probe completion and the timeout/cancel 
branch.
+               // Without this second snapshot the member would be reported as 
a
+               // normal revision laggard instead of a one-time eviction 
laggard,
+               // and if it was the last blocked member the call would return
+               // applied=false when applied=true is correct.
+               view = b.currentMembership()
+               alive, newlyEvicted = applyTransitions(alive, view, lastRev)
+               evictedLaggards = append(evictedLaggards, newlyEvicted...)
+               lastResults = pruneRevResults(lastResults, aliveSet(alive))
+               if len(alive) == 0 || allReady(lastResults) {
+                       return &schemav1.AwaitRevisionAppliedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
+               }
                if time.Now().After(deadline) {
                        return &schemav1.AwaitRevisionAppliedResponse{
                                Applied:  false,
@@ -387,6 +450,19 @@ func (b *barrierService) awaitSchemaAppliedCluster(ctx 
context.Context, req *sch
                if allKeysApplied(lastResults) {
                        return &schemav1.AwaitSchemaAppliedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
                }
+               // Post-probe membership refresh — see 
awaitRevisionAppliedCluster
+               // for the rationale. Same race window applies to the per-key
+               // barrier: a member that flips Active → Evictable after the
+               // GetKeyRevisions probe finishes but before the timeout branch
+               // must surface as an eviction laggard, not as a missing-keys
+               // laggard.
+               view = b.currentMembership()
+               alive, newlyEvicted = applyTransitionsApplied(alive, view)
+               evictedLaggards = append(evictedLaggards, newlyEvicted...)
+               lastResults = pruneKeyResults(lastResults, aliveSet(alive))
+               if len(alive) == 0 || allKeysApplied(lastResults) {
+                       return &schemav1.AwaitSchemaAppliedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
+               }
                if time.Now().After(deadline) {
                        return &schemav1.AwaitSchemaAppliedResponse{
                                Applied:  false,
@@ -499,10 +575,7 @@ func (b *barrierService) probeKeysOne(ctx context.Context, 
m member, keys []*sch
 
        var missing []*schemav1.SchemaKey
        for chunkStart := 0; chunkStart < len(keys); chunkStart += 
barrierKeyChunkSize {
-               chunkEnd := chunkStart + barrierKeyChunkSize
-               if chunkEnd > len(keys) {
-                       chunkEnd = len(keys)
-               }
+               chunkEnd := min(chunkStart+barrierKeyChunkSize, len(keys))
                chunkKeys := keys[chunkStart:chunkEnd]
                resp, rpcErr := statusClient.GetKeyRevisions(ctx, 
&clusterv1.GetKeyRevisionsRequest{Keys: chunkKeys})
                if rpcErr != nil {
@@ -578,6 +651,17 @@ func (b *barrierService) awaitSchemaDeletedCluster(ctx 
context.Context, req *sch
                if allKeysAbsent(lastResults) {
                        return &schemav1.AwaitSchemaDeletedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
                }
+               // Post-probe membership refresh — see 
awaitRevisionAppliedCluster.
+               // A member that flips Active → Evictable after GetAbsentKeys
+               // finishes must surface as an eviction laggard, not as a
+               // still-present-keys laggard.
+               view = b.currentMembership()
+               alive, newlyEvicted = applyTransitionsApplied(alive, view)
+               evictedLaggards = append(evictedLaggards, newlyEvicted...)
+               lastResults = pruneStillPresentResults(lastResults, 
aliveSet(alive))
+               if len(alive) == 0 || allKeysAbsent(lastResults) {
+                       return &schemav1.AwaitSchemaDeletedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
+               }
                if time.Now().After(deadline) {
                        return &schemav1.AwaitSchemaDeletedResponse{
                                Applied:  false,
@@ -656,10 +740,7 @@ func (b *barrierService) probeAbsentOne(ctx 
context.Context, m member, keys []*s
 
        var present []*schemav1.SchemaKey
        for chunkStart := 0; chunkStart < len(keys); chunkStart += 
barrierKeyChunkSize {
-               chunkEnd := chunkStart + barrierKeyChunkSize
-               if chunkEnd > len(keys) {
-                       chunkEnd = len(keys)
-               }
+               chunkEnd := min(chunkStart+barrierKeyChunkSize, len(keys))
                chunkKeys := keys[chunkStart:chunkEnd]
                resp, rpcErr := statusClient.GetAbsentKeys(ctx, 
&clusterv1.GetAbsentKeysRequest{Keys: chunkKeys})
                if rpcErr != nil {
diff --git a/banyand/liaison/grpc/barrier_cluster_test.go 
b/banyand/liaison/grpc/barrier_cluster_test.go
index ce98f7c79..bea9a9c42 100644
--- a/banyand/liaison/grpc/barrier_cluster_test.go
+++ b/banyand/liaison/grpc/barrier_cluster_test.go
@@ -829,3 +829,184 @@ func 
TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey(t *testing.T)
                        "every member's laggard entry must list k0 as still 
present")
        }
 }
+
+// TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue verifies the
+// post-probe membership refresh's `len(alive) == 0` early-exit: if every
+// frozen member transitions to Evictable mid-call, the barrier surfaces the
+// eviction notices and returns Applied=true (the cluster has no remaining
+// objection to the target revision). Driven without a self entry so the
+// alive set is purely the two data nodes.
+func TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       tier2 := &fakeQueueClient{
+               routeTableFn: mutatingRouteTable(
+                       &databasev1.RouteTable{Active: []string{"d1", "d2"}},
+                       &databasev1.RouteTable{Evictable: []string{"d1", 
"d2"}}, // both flip
+               ),
+               statusClients: 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+                       "d1": &fakeNodeStatusClient{maxRev: 50}, // both behind 
initially
+                       "d2": &fakeNodeStatusClient{maxRev: 50},
+               },
+       }
+       svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), 
tier2: tier2, self: ""}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(200 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(),
+               "every frozen member evicted → cluster has no objection → 
applied=true")
+       require.Len(t, resp.GetLaggards(), 2, "both members must surface as 
eviction laggards")
+       for _, l := range resp.GetLaggards() {
+               assert.Equal(t, "evicted_during_poll", l.GetReason())
+       }
+}
+
+// TestFanOut_AllPeersUnimplemented_AllReady locks the cross-version policy
+// when every peer answers Unimplemented (a fully Phase-1 fleet under a
+// v0.13 liaison). The single-peer case is covered by
+// TestFanOut_NodeReturnsUnimplemented_TreatedAsReady; this test extends to
+// multiple peers + a data node so the "all-Unimplemented" branch is
+// regression-locked.
+func TestFanOut_AllPeersUnimplemented_AllReady(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       legacy := func() *fakeNodeStatusClient {
+               return &fakeNodeStatusClient{err: 
status.Error(codes.Unimplemented, "phase 1 node")}
+       }
+       tier1 := newFakeTier([]string{"peer-l1", "peer-l2"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "peer-l1": legacy(),
+               "peer-l2": legacy(),
+       })
+       tier2 := newFakeTier([]string{"phase1-data"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "phase1-data": legacy(),
+       })
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: 
"self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(50 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(),
+               "every peer Unimplemented + self ready → applied=true (Phase-1 
fleet treated as ready)")
+       assert.Empty(t, resp.GetLaggards())
+}
+
+// TestFanOut_NilRouteTable_FallsBackToSelf verifies that when both tier
+// queue clients return a nil *RouteTable from GetRouteTable, the snapshot
+// degenerates to {self} and the call converges via the in-process probe
+// without trying to dial any peer.
+func TestFanOut_NilRouteTable_FallsBackToSelf(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       tier1 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable { 
return nil }}
+       tier2 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable { 
return nil }}
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: 
"self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(50 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(), "nil route tables + self ready → 
applied=true")
+       assert.Empty(t, resp.GetLaggards())
+}
+
+// TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable verifies that when
+// both tier clients return nil AND the receiving liaison has no curNode
+// (test-context fixture, ContextNodeKey unset), the call fails fast with
+// codes.Unavailable rather than blocking on an empty watched set.
+func TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable(t *testing.T) {
+       tier1 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable { 
return nil }}
+       tier2 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable { 
return nil }}
+       svc := (&clusterFixture{cache: &staticBarrierCache{}, tier1: tier1, 
tier2: tier2, self: ""}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 1,
+               Timeout:     durationpb.New(20 * time.Millisecond),
+       })
+       require.Error(t, err)
+       assert.Nil(t, resp)
+       assert.Equal(t, codes.Unavailable, status.Code(err))
+}
+
+// TestFanOut_MemberMissingFromStatusClients verifies that a member listed
+// in tier Active but absent from the per-tier statusClients map (i.e. the
+// borrowed *grpc.ClientConn is unavailable for that node) is treated as a
+// transient laggard for this iteration only — same path as a generic
+// gRPC error — and surfaces in the timeout response.
+func TestFanOut_MemberMissingFromStatusClients(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       tier2 := newFakeTier([]string{"missing-conn"}, nil) // active but no 
client
+       svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), 
tier2: tier2, self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(60 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied(),
+               "member without a connection client must block convergence 
until timeout")
+       require.Len(t, resp.GetLaggards(), 1)
+       assert.Equal(t, "data-missing-conn", resp.GetLaggards()[0].GetNode())
+}
+
+// TestAwaitSchemaApplied_ChunkingBoundaries pins the per-peer chunking
+// math at boundaries that the existing 1500-key test doesn't cover:
+//   - 999 keys → 1 chunk (just under the threshold).
+//   - 1000 keys → 1 chunk (exact boundary; second iteration must NOT fire).
+//   - 1001 keys → 2 chunks (smallest second chunk).
+//   - 2000 keys → 2 chunks (exact double).
+//
+// Each subtest verifies (a) the GetKeyRevisions call count per peer per
+// iteration matches the expected chunk count, and (b) every key shows up
+// exactly once in the aggregated missing_keys laggard. The peer state
+// reports every key absent so the call times out and the aggregated
+// missing_keys is checkable end-to-end.
+func TestAwaitSchemaApplied_ChunkingBoundaries(t *testing.T) {
+       cases := []struct {
+               name           string
+               keys           int
+               expectedChunks int32
+       }{
+               {"BelowThreshold", 999, 1},
+               {"ExactBoundary", 1000, 1},
+               {"OneOverThreshold", 1001, 2},
+               {"ExactDouble", 2000, 2},
+       }
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       keys := make([]*schemav1.SchemaKey, tc.keys)
+                       for i := range tc.keys {
+                               keys[i] = streamKey("g", "k"+strconv.Itoa(i))
+                       }
+                       cache := &staticBarrierCache{maxModRevision: 100} // 
self has no keys → also missing
+                       var calls int32
+                       peer := &fakeNodeStatusClient{
+                               keyState:     map[string]fakeKeyState{}, // 
peer has nothing → every key missing
+                               keyRevsCalls: &calls,
+                       }
+                       tier1 := newFakeTier([]string{"peer"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer})
+                       svc := (&clusterFixture{cache: cache, tier1: tier1, 
tier2: newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+                       resp, err := 
svc.AwaitSchemaApplied(context.Background(), 
&schemav1.AwaitSchemaAppliedRequest{
+                               Keys:    keys,
+                               Timeout: durationpb.New(40 * time.Millisecond),
+                       })
+                       require.NoError(t, err)
+                       assert.False(t, resp.GetApplied())
+
+                       final := atomic.LoadInt32(&calls)
+                       assert.Equal(t, int32(0), final%tc.expectedChunks,
+                               "call count must be a multiple of expected 
chunks per iteration (got %d, expected_per_iter=%d)",
+                               final, tc.expectedChunks)
+                       assert.GreaterOrEqual(t, final, tc.expectedChunks,
+                               "at least one full iteration of chunks must 
have run")
+
+                       la := laggardByNode(resp.GetLaggards(), "liaison-peer")
+                       require.NotNil(t, la, "peer must be a laggard")
+                       assert.Len(t, la.GetMissingKeys(), tc.keys,
+                               "missing_keys must aggregate every key across 
chunks (no double-count, no drop)")
+               })
+       }
+}

Reply via email to