This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/phase-2-cp5-march by this push:
new a409991f6 fix(barrier): post-probe membership refresh + chunking
coverage (Copilot CLI review)
a409991f6 is described below
commit a409991f6b5b9a7f0b77026324f003f4ab569bd9
Author: Hongtao Gao <[email protected]>
AuthorDate: Tue May 5 09:39:15 2026 +0000
fix(barrier): post-probe membership refresh + chunking coverage (Copilot
CLI review)
Address findings from a `copilot --model gpt-5.4` review of the cumulative
phase-2-cp5-march branch:
- HIGH: race window between probe completion and timeout/cancel return.
Each iteration of the three awaitXxxCluster paths snapshotted membership
ONCE at the top, then built the timeout/cancel response from lastResults
without a final refresh. A member that flipped Active → Evictable after
probeMembers / probeKeyRevisions / probeAbsentKeys finished but before
the timeout branch fired was reported as a normal revision / missing-key
/ still-present laggard instead of as a one-time `evicted_during_poll`
laggard, and if it was the last blocked member the call returned
applied=false when applied=true was correct.
Fix: after every unsuccessful probe pass, re-run currentMembership() +
applyTransitions / applyTransitionsApplied(), append any newly-detected
eviction laggards, prune lastResults to surviving members via the new
pruneRevResults / pruneKeyResults / pruneStillPresentResults helpers,
then re-check the all-ready predicate (and the len(alive)==0 early-exit
for the rev path) before falling through to the timeout/cancel branch.
- HIGH: unverified `GetRouteTable()` copy contract.
Comment in currentMembership claimed "the connection-pool layer
guarantees that GetRouteTable() returns a copy" without naming the
proving site. Verified at pkg/grpchelper/connmanager.go:308-333:
GetRouteTable holds the manager's RLock and returns a freshly allocated
*databasev1.RouteTable whose Active/Evictable slices are built per call.
Updated the docstring to cite the line so future readers don't have to
re-derive the contract.
- MEDIUM: missing test coverage.
- TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue: every frozen
member transitions to Evictable mid-call → applied=true with two
eviction laggards. Locks the post-probe `len(alive)==0` early-exit.
- TestAwaitSchemaApplied_ChunkingBoundaries: parameterised over key
counts {999, 1000, 1001, 2000} verifying GetKeyRevisions call count
is the expected per-iteration chunk count and aggregated missing_keys
has every key exactly once. Pins the probeKeysOne fencepost.
- TestFanOut_AllPeersUnimplemented_AllReady: multi-peer all-Unimplemented
case (the existing _NodeReturnsUnimplemented_TreatedAsReady covers
only one peer); regression-locks the cross-version policy across the
entire fan-out.
- TestFanOut_NilRouteTable_FallsBackToSelf: nil route tables on both
tiers + self present → applied=true via in-process self probe.
- TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable: nil route tables
with no self → codes.Unavailable (fail-fast).
- TestFanOut_MemberMissingFromStatusClients: member listed in tier
Active but absent from statusClients map → transient laggard for
each iteration → timeout response carries it as a laggard.
- minor: simplify two `if chunkEnd > len(keys) { chunkEnd = len(keys) }`
blocks to `chunkEnd := min(chunkStart + barrierKeyChunkSize, len(keys))`
per the govet minmax suggestion (Go 1.21+ builtin).
Lint clean; existing 30 cluster-fan-out tests stay green; 8 new tests
land green on first run.
Open items NOT addressed in this commit (left as discussion points for
CP-5 reviewers):
- Route-table read amplification under slow-convergence timeouts (≤72
GetRouteTable calls per timed-out 5s call); deferred to Step 2.7's
metrics phase to first measure rather than cache speculatively.
- The two #1111 commits (`3588e39c`, `3f7a6c75`) at the bottom of this
branch are still in the in-flight per-step PR; this branch will
rebase once #1111 merges.
via [HAPI](https://hapi.run)
---
banyand/liaison/grpc/barrier_cluster.go | 101 +++++++++++++--
banyand/liaison/grpc/barrier_cluster_test.go | 181 +++++++++++++++++++++++++++
2 files changed, 272 insertions(+), 10 deletions(-)
diff --git a/banyand/liaison/grpc/barrier_cluster.go
b/banyand/liaison/grpc/barrier_cluster.go
index 6370fbc92..e4d1edb1b 100644
--- a/banyand/liaison/grpc/barrier_cluster.go
+++ b/banyand/liaison/grpc/barrier_cluster.go
@@ -143,8 +143,11 @@ type membershipView struct {
// currentMembership reads tier1 and tier2 route tables and merges their
// Active and Evictable lists into name sets. Both providers contribute to
// both sets — dedup is by name, so a hybrid host appearing in both tiers
-// counts once. The connection-pool layer guarantees that GetRouteTable()
-// returns a copy, so this read is safe to call concurrently without locking.
+// counts once. ConnManager.GetRouteTable (pkg/grpchelper/connmanager.go:308)
+// holds the manager's RLock for the duration of the read and returns a
+// freshly allocated *databasev1.RouteTable whose Active/Evictable slices
+// are built per call, so concurrent invocation from the barrier loop is
+// race-free without additional locking at this layer.
func (b *barrierService) currentMembership() membershipView {
view := membershipView{
active: map[string]struct{}{},
@@ -174,6 +177,53 @@ func (b *barrierService) currentMembership()
membershipView {
return view
}
+// aliveSet returns a name-keyed set of the supplied alive slice for O(1)
+// post-probe filtering.
+func aliveSet(alive []member) map[string]struct{} {
+ out := make(map[string]struct{}, len(alive))
+ for _, m := range alive {
+ out[m.name] = struct{}{}
+ }
+ return out
+}
+
+// pruneRevResults filters probeResult slice to only entries whose member
+// is still in the alive set. Used by the post-probe membership refresh in
+// awaitRevisionAppliedCluster to avoid reporting an in-flight evicted
+// member as a normal revision laggard on the timeout/cancel return path.
+func pruneRevResults(results []probeResult, set map[string]struct{})
[]probeResult {
+ out := results[:0]
+ for _, r := range results {
+ if _, ok := set[r.member.name]; ok {
+ out = append(out, r)
+ }
+ }
+ return out
+}
+
+// pruneKeyResults is the keyProbeResult sibling of pruneRevResults.
+func pruneKeyResults(results []keyProbeResult, set map[string]struct{})
[]keyProbeResult {
+ out := results[:0]
+ for _, r := range results {
+ if _, ok := set[r.member.name]; ok {
+ out = append(out, r)
+ }
+ }
+ return out
+}
+
+// pruneStillPresentResults is the stillPresentResult sibling of
+// pruneRevResults.
+func pruneStillPresentResults(results []stillPresentResult, set
map[string]struct{}) []stillPresentResult {
+ out := results[:0]
+ for _, r := range results {
+ if _, ok := set[r.member.name]; ok {
+ out = append(out, r)
+ }
+ }
+ return out
+}
+
// applyTransitions walks the alive members against the current membership
// view and partitions them into "still alive" (kept for the next probe) and
// "newly evicted" (surfaced as one-time laggards with
reason="evicted_during_poll").
@@ -248,6 +298,19 @@ func (b *barrierService) awaitRevisionAppliedCluster(ctx
context.Context, req *s
if allReady(lastResults) {
return &schemav1.AwaitRevisionAppliedResponse{Applied:
true, Laggards: evictedLaggards}, nil
}
+ // Post-probe membership refresh: a member can transition
Active →
+ // Evictable between probe completion and the timeout/cancel
branch.
+ // Without this second snapshot the member would be reported as
a
+ // normal revision laggard instead of a one-time eviction
laggard,
+ // and if it was the last blocked member the call would return
+ // applied=false when applied=true is correct.
+ view = b.currentMembership()
+ alive, newlyEvicted = applyTransitions(alive, view, lastRev)
+ evictedLaggards = append(evictedLaggards, newlyEvicted...)
+ lastResults = pruneRevResults(lastResults, aliveSet(alive))
+ if len(alive) == 0 || allReady(lastResults) {
+ return &schemav1.AwaitRevisionAppliedResponse{Applied:
true, Laggards: evictedLaggards}, nil
+ }
if time.Now().After(deadline) {
return &schemav1.AwaitRevisionAppliedResponse{
Applied: false,
@@ -387,6 +450,19 @@ func (b *barrierService) awaitSchemaAppliedCluster(ctx
context.Context, req *sch
if allKeysApplied(lastResults) {
return &schemav1.AwaitSchemaAppliedResponse{Applied:
true, Laggards: evictedLaggards}, nil
}
+ // Post-probe membership refresh — see
awaitRevisionAppliedCluster
+ // for the rationale. Same race window applies to the per-key
+ // barrier: a member that flips Active → Evictable after the
+ // GetKeyRevisions probe finishes but before the timeout branch
+ // must surface as an eviction laggard, not as a missing-keys
+ // laggard.
+ view = b.currentMembership()
+ alive, newlyEvicted = applyTransitionsApplied(alive, view)
+ evictedLaggards = append(evictedLaggards, newlyEvicted...)
+ lastResults = pruneKeyResults(lastResults, aliveSet(alive))
+ if len(alive) == 0 || allKeysApplied(lastResults) {
+ return &schemav1.AwaitSchemaAppliedResponse{Applied:
true, Laggards: evictedLaggards}, nil
+ }
if time.Now().After(deadline) {
return &schemav1.AwaitSchemaAppliedResponse{
Applied: false,
@@ -499,10 +575,7 @@ func (b *barrierService) probeKeysOne(ctx context.Context,
m member, keys []*sch
var missing []*schemav1.SchemaKey
for chunkStart := 0; chunkStart < len(keys); chunkStart +=
barrierKeyChunkSize {
- chunkEnd := chunkStart + barrierKeyChunkSize
- if chunkEnd > len(keys) {
- chunkEnd = len(keys)
- }
+ chunkEnd := min(chunkStart+barrierKeyChunkSize, len(keys))
chunkKeys := keys[chunkStart:chunkEnd]
resp, rpcErr := statusClient.GetKeyRevisions(ctx,
&clusterv1.GetKeyRevisionsRequest{Keys: chunkKeys})
if rpcErr != nil {
@@ -578,6 +651,17 @@ func (b *barrierService) awaitSchemaDeletedCluster(ctx
context.Context, req *sch
if allKeysAbsent(lastResults) {
return &schemav1.AwaitSchemaDeletedResponse{Applied:
true, Laggards: evictedLaggards}, nil
}
+ // Post-probe membership refresh — see
awaitRevisionAppliedCluster.
+ // A member that flips Active → Evictable after GetAbsentKeys
+ // finishes must surface as an eviction laggard, not as a
+ // still-present-keys laggard.
+ view = b.currentMembership()
+ alive, newlyEvicted = applyTransitionsApplied(alive, view)
+ evictedLaggards = append(evictedLaggards, newlyEvicted...)
+ lastResults = pruneStillPresentResults(lastResults,
aliveSet(alive))
+ if len(alive) == 0 || allKeysAbsent(lastResults) {
+ return &schemav1.AwaitSchemaDeletedResponse{Applied:
true, Laggards: evictedLaggards}, nil
+ }
if time.Now().After(deadline) {
return &schemav1.AwaitSchemaDeletedResponse{
Applied: false,
@@ -656,10 +740,7 @@ func (b *barrierService) probeAbsentOne(ctx
context.Context, m member, keys []*s
var present []*schemav1.SchemaKey
for chunkStart := 0; chunkStart < len(keys); chunkStart +=
barrierKeyChunkSize {
- chunkEnd := chunkStart + barrierKeyChunkSize
- if chunkEnd > len(keys) {
- chunkEnd = len(keys)
- }
+ chunkEnd := min(chunkStart+barrierKeyChunkSize, len(keys))
chunkKeys := keys[chunkStart:chunkEnd]
resp, rpcErr := statusClient.GetAbsentKeys(ctx,
&clusterv1.GetAbsentKeysRequest{Keys: chunkKeys})
if rpcErr != nil {
diff --git a/banyand/liaison/grpc/barrier_cluster_test.go
b/banyand/liaison/grpc/barrier_cluster_test.go
index ce98f7c79..bea9a9c42 100644
--- a/banyand/liaison/grpc/barrier_cluster_test.go
+++ b/banyand/liaison/grpc/barrier_cluster_test.go
@@ -829,3 +829,184 @@ func
TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey(t *testing.T)
"every member's laggard entry must list k0 as still
present")
}
}
+
+// TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue verifies the
+// post-probe membership refresh's `len(alive) == 0` early-exit: if every
+// frozen member transitions to Evictable mid-call, the barrier surfaces the
+// eviction notices and returns Applied=true (the cluster has no remaining
+// objection to the target revision). Driven without a self entry so the
+// alive set is purely the two data nodes.
+func TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 100}
+ tier2 := &fakeQueueClient{
+ routeTableFn: mutatingRouteTable(
+ &databasev1.RouteTable{Active: []string{"d1", "d2"}},
+ &databasev1.RouteTable{Evictable: []string{"d1",
"d2"}}, // both flip
+ ),
+ statusClients:
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "d1": &fakeNodeStatusClient{maxRev: 50}, // both behind
initially
+ "d2": &fakeNodeStatusClient{maxRev: 50},
+ },
+ }
+ svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil),
tier2: tier2, self: ""}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(200 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.True(t, resp.GetApplied(),
+ "every frozen member evicted → cluster has no objection →
applied=true")
+ require.Len(t, resp.GetLaggards(), 2, "both members must surface as
eviction laggards")
+ for _, l := range resp.GetLaggards() {
+ assert.Equal(t, "evicted_during_poll", l.GetReason())
+ }
+}
+
+// TestFanOut_AllPeersUnimplemented_AllReady locks the cross-version policy
+// when every peer answers Unimplemented (a fully Phase-1 fleet under a
+// v0.13 liaison). The single-peer case is covered by
+// TestFanOut_NodeReturnsUnimplemented_TreatedAsReady; this test extends to
+// multiple peers + a data node so the "all-Unimplemented" branch is
+// regression-locked.
+func TestFanOut_AllPeersUnimplemented_AllReady(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 100}
+ legacy := func() *fakeNodeStatusClient {
+ return &fakeNodeStatusClient{err:
status.Error(codes.Unimplemented, "phase 1 node")}
+ }
+ tier1 := newFakeTier([]string{"peer-l1", "peer-l2"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "peer-l1": legacy(),
+ "peer-l2": legacy(),
+ })
+ tier2 := newFakeTier([]string{"phase1-data"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "phase1-data": legacy(),
+ })
+ svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self:
"self-liaison"}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(50 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.True(t, resp.GetApplied(),
+ "every peer Unimplemented + self ready → applied=true (Phase-1
fleet treated as ready)")
+ assert.Empty(t, resp.GetLaggards())
+}
+
+// TestFanOut_NilRouteTable_FallsBackToSelf verifies that when both tier
+// queue clients return a nil *RouteTable from GetRouteTable, the snapshot
+// degenerates to {self} and the call converges via the in-process probe
+// without trying to dial any peer.
+func TestFanOut_NilRouteTable_FallsBackToSelf(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 100}
+ tier1 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable {
return nil }}
+ tier2 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable {
return nil }}
+ svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self:
"self-liaison"}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(50 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.True(t, resp.GetApplied(), "nil route tables + self ready →
applied=true")
+ assert.Empty(t, resp.GetLaggards())
+}
+
+// TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable verifies that when
+// both tier clients return nil AND the receiving liaison has no curNode
+// (test-context fixture, ContextNodeKey unset), the call fails fast with
+// codes.Unavailable rather than blocking on an empty watched set.
+func TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable(t *testing.T) {
+ tier1 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable {
return nil }}
+ tier2 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable {
return nil }}
+ svc := (&clusterFixture{cache: &staticBarrierCache{}, tier1: tier1,
tier2: tier2, self: ""}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 1,
+ Timeout: durationpb.New(20 * time.Millisecond),
+ })
+ require.Error(t, err)
+ assert.Nil(t, resp)
+ assert.Equal(t, codes.Unavailable, status.Code(err))
+}
+
+// TestFanOut_MemberMissingFromStatusClients verifies that a member listed
+// in tier Active but absent from the per-tier statusClients map (i.e. the
+// borrowed *grpc.ClientConn is unavailable for that node) is treated as a
+// transient laggard for this iteration only — same path as a generic
+// gRPC error — and surfaces in the timeout response.
+func TestFanOut_MemberMissingFromStatusClients(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 100}
+ tier2 := newFakeTier([]string{"missing-conn"}, nil) // active but no
client
+ svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil),
tier2: tier2, self: "self-liaison"}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(60 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.False(t, resp.GetApplied(),
+ "member without a connection client must block convergence
until timeout")
+ require.Len(t, resp.GetLaggards(), 1)
+ assert.Equal(t, "data-missing-conn", resp.GetLaggards()[0].GetNode())
+}
+
+// TestAwaitSchemaApplied_ChunkingBoundaries pins the per-peer chunking
+// math at boundaries that the existing 1500-key test doesn't cover:
+// - 999 keys → 1 chunk (just under the threshold).
+// - 1000 keys → 1 chunk (exact boundary; second iteration must NOT fire).
+// - 1001 keys → 2 chunks (smallest second chunk).
+// - 2000 keys → 2 chunks (exact double).
+//
+// Each subtest verifies (a) the GetKeyRevisions call count per peer per
+// iteration matches the expected chunk count, and (b) every key shows up
+// exactly once in the aggregated missing_keys laggard. The peer state
+// reports every key absent so the call times out and the aggregated
+// missing_keys is checkable end-to-end.
+func TestAwaitSchemaApplied_ChunkingBoundaries(t *testing.T) {
+ cases := []struct {
+ name string
+ keys int
+ expectedChunks int32
+ }{
+ {"BelowThreshold", 999, 1},
+ {"ExactBoundary", 1000, 1},
+ {"OneOverThreshold", 1001, 2},
+ {"ExactDouble", 2000, 2},
+ }
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ keys := make([]*schemav1.SchemaKey, tc.keys)
+ for i := range tc.keys {
+ keys[i] = streamKey("g", "k"+strconv.Itoa(i))
+ }
+ cache := &staticBarrierCache{maxModRevision: 100} //
self has no keys → also missing
+ var calls int32
+ peer := &fakeNodeStatusClient{
+ keyState: map[string]fakeKeyState{}, //
peer has nothing → every key missing
+ keyRevsCalls: &calls,
+ }
+ tier1 := newFakeTier([]string{"peer"},
map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer})
+ svc := (&clusterFixture{cache: cache, tier1: tier1,
tier2: newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+ resp, err :=
svc.AwaitSchemaApplied(context.Background(),
&schemav1.AwaitSchemaAppliedRequest{
+ Keys: keys,
+ Timeout: durationpb.New(40 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.False(t, resp.GetApplied())
+
+ final := atomic.LoadInt32(&calls)
+ assert.Equal(t, int32(0), final%tc.expectedChunks,
+ "call count must be a multiple of expected
chunks per iteration (got %d, expected_per_iter=%d)",
+ final, tc.expectedChunks)
+ assert.GreaterOrEqual(t, final, tc.expectedChunks,
+ "at least one full iteration of chunks must
have run")
+
+ la := laggardByNode(resp.GetLaggards(), "liaison-peer")
+ require.NotNil(t, la, "peer must be a laggard")
+ assert.Len(t, la.GetMissingKeys(), tc.keys,
+ "missing_keys must aggregate every key across
chunks (no double-count, no drop)")
+ })
+ }
+}