This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 9b083ecc9ef56cc28ed5ceccd11e67620e962b38 Author: Hongtao Gao <[email protected]> AuthorDate: Tue May 5 09:39:15 2026 +0000 fix(barrier): post-probe membership refresh + chunking coverage (Copilot CLI review) Address findings from a `copilot --model gpt-5.4` review of the cumulative phase-2-cp5-march branch: - HIGH: race window between probe completion and timeout/cancel return. Each iteration of the three awaitXxxCluster paths snapshotted membership ONCE at the top, then built the timeout/cancel response from lastResults without a final refresh. A member that flipped Active → Evictable after probeMembers / probeKeyRevisions / probeAbsentKeys finished but before the timeout branch fired was reported as a normal revision / missing-key / still-present laggard instead of as a one-time `evicted_during_poll` laggard, and if it was the last blocked member the call returned applied=false when applied=true was correct. Fix: after every unsuccessful probe pass, re-run currentMembership() + applyTransitions / applyTransitionsApplied(), append any newly-detected eviction laggards, prune lastResults to surviving members via the new pruneRevResults / pruneKeyResults / pruneStillPresentResults helpers, then re-check the all-ready predicate (and the len(alive)==0 early-exit for the rev path) before falling through to the timeout/cancel branch. - HIGH: unverified `GetRouteTable()` copy contract. Comment in currentMembership claimed "the connection-pool layer guarantees that GetRouteTable() returns a copy" without naming the proving site. Verified at pkg/grpchelper/connmanager.go:308-333: GetRouteTable holds the manager's RLock and returns a freshly allocated *databasev1.RouteTable whose Active/Evictable slices are built per call. Updated the docstring to cite the line so future readers don't have to re-derive the contract. - MEDIUM: missing test coverage. - TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue: every frozen member transitions to Evictable mid-call → applied=true with two eviction laggards. Locks the post-probe `len(alive)==0` early-exit. - TestAwaitSchemaApplied_ChunkingBoundaries: parameterised over key counts {999, 1000, 1001, 2000} verifying GetKeyRevisions call count is the expected per-iteration chunk count and aggregated missing_keys has every key exactly once. Pins the probeKeysOne fencepost. - TestFanOut_AllPeersUnimplemented_AllReady: multi-peer all-Unimplemented case (the existing _NodeReturnsUnimplemented_TreatedAsReady covers only one peer); regression-locks the cross-version policy across the entire fan-out. - TestFanOut_NilRouteTable_FallsBackToSelf: nil route tables on both tiers + self present → applied=true via in-process self probe. - TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable: nil route tables with no self → codes.Unavailable (fail-fast). - TestFanOut_MemberMissingFromStatusClients: member listed in tier Active but absent from statusClients map → transient laggard for each iteration → timeout response carries it as a laggard. - minor: simplify two `if chunkEnd > len(keys) { chunkEnd = len(keys) }` blocks to `chunkEnd := min(chunkStart + barrierKeyChunkSize, len(keys))` per the govet minmax suggestion (Go 1.21+ builtin). Lint clean; existing 30 cluster-fan-out tests stay green; 8 new tests land green on first run. Open items NOT addressed in this commit (left as discussion points for CP-5 reviewers): - Route-table read amplification under slow-convergence timeouts (≤72 GetRouteTable calls per timed-out 5s call); deferred to Step 2.7's metrics phase to first measure rather than cache speculatively. - The two #1111 commits (`3588e39c`, `3f7a6c75`) at the bottom of this branch are still in the in-flight per-step PR; this branch will rebase once #1111 merges. via [HAPI](https://hapi.run) --- banyand/liaison/grpc/barrier_cluster.go | 101 +++++++++++++-- banyand/liaison/grpc/barrier_cluster_test.go | 181 +++++++++++++++++++++++++++ 2 files changed, 272 insertions(+), 10 deletions(-) diff --git a/banyand/liaison/grpc/barrier_cluster.go b/banyand/liaison/grpc/barrier_cluster.go index 6370fbc92..e4d1edb1b 100644 --- a/banyand/liaison/grpc/barrier_cluster.go +++ b/banyand/liaison/grpc/barrier_cluster.go @@ -143,8 +143,11 @@ type membershipView struct { // currentMembership reads tier1 and tier2 route tables and merges their // Active and Evictable lists into name sets. Both providers contribute to // both sets — dedup is by name, so a hybrid host appearing in both tiers -// counts once. The connection-pool layer guarantees that GetRouteTable() -// returns a copy, so this read is safe to call concurrently without locking. +// counts once. ConnManager.GetRouteTable (pkg/grpchelper/connmanager.go:308) +// holds the manager's RLock for the duration of the read and returns a +// freshly allocated *databasev1.RouteTable whose Active/Evictable slices +// are built per call, so concurrent invocation from the barrier loop is +// race-free without additional locking at this layer. func (b *barrierService) currentMembership() membershipView { view := membershipView{ active: map[string]struct{}{}, @@ -174,6 +177,53 @@ func (b *barrierService) currentMembership() membershipView { return view } +// aliveSet returns a name-keyed set of the supplied alive slice for O(1) +// post-probe filtering. +func aliveSet(alive []member) map[string]struct{} { + out := make(map[string]struct{}, len(alive)) + for _, m := range alive { + out[m.name] = struct{}{} + } + return out +} + +// pruneRevResults filters probeResult slice to only entries whose member +// is still in the alive set. Used by the post-probe membership refresh in +// awaitRevisionAppliedCluster to avoid reporting an in-flight evicted +// member as a normal revision laggard on the timeout/cancel return path. +func pruneRevResults(results []probeResult, set map[string]struct{}) []probeResult { + out := results[:0] + for _, r := range results { + if _, ok := set[r.member.name]; ok { + out = append(out, r) + } + } + return out +} + +// pruneKeyResults is the keyProbeResult sibling of pruneRevResults. +func pruneKeyResults(results []keyProbeResult, set map[string]struct{}) []keyProbeResult { + out := results[:0] + for _, r := range results { + if _, ok := set[r.member.name]; ok { + out = append(out, r) + } + } + return out +} + +// pruneStillPresentResults is the stillPresentResult sibling of +// pruneRevResults. +func pruneStillPresentResults(results []stillPresentResult, set map[string]struct{}) []stillPresentResult { + out := results[:0] + for _, r := range results { + if _, ok := set[r.member.name]; ok { + out = append(out, r) + } + } + return out +} + // applyTransitions walks the alive members against the current membership // view and partitions them into "still alive" (kept for the next probe) and // "newly evicted" (surfaced as one-time laggards with reason="evicted_during_poll"). @@ -248,6 +298,19 @@ func (b *barrierService) awaitRevisionAppliedCluster(ctx context.Context, req *s if allReady(lastResults) { return &schemav1.AwaitRevisionAppliedResponse{Applied: true, Laggards: evictedLaggards}, nil } + // Post-probe membership refresh: a member can transition Active → + // Evictable between probe completion and the timeout/cancel branch. + // Without this second snapshot the member would be reported as a + // normal revision laggard instead of a one-time eviction laggard, + // and if it was the last blocked member the call would return + // applied=false when applied=true is correct. + view = b.currentMembership() + alive, newlyEvicted = applyTransitions(alive, view, lastRev) + evictedLaggards = append(evictedLaggards, newlyEvicted...) + lastResults = pruneRevResults(lastResults, aliveSet(alive)) + if len(alive) == 0 || allReady(lastResults) { + return &schemav1.AwaitRevisionAppliedResponse{Applied: true, Laggards: evictedLaggards}, nil + } if time.Now().After(deadline) { return &schemav1.AwaitRevisionAppliedResponse{ Applied: false, @@ -387,6 +450,19 @@ func (b *barrierService) awaitSchemaAppliedCluster(ctx context.Context, req *sch if allKeysApplied(lastResults) { return &schemav1.AwaitSchemaAppliedResponse{Applied: true, Laggards: evictedLaggards}, nil } + // Post-probe membership refresh — see awaitRevisionAppliedCluster + // for the rationale. Same race window applies to the per-key + // barrier: a member that flips Active → Evictable after the + // GetKeyRevisions probe finishes but before the timeout branch + // must surface as an eviction laggard, not as a missing-keys + // laggard. + view = b.currentMembership() + alive, newlyEvicted = applyTransitionsApplied(alive, view) + evictedLaggards = append(evictedLaggards, newlyEvicted...) + lastResults = pruneKeyResults(lastResults, aliveSet(alive)) + if len(alive) == 0 || allKeysApplied(lastResults) { + return &schemav1.AwaitSchemaAppliedResponse{Applied: true, Laggards: evictedLaggards}, nil + } if time.Now().After(deadline) { return &schemav1.AwaitSchemaAppliedResponse{ Applied: false, @@ -499,10 +575,7 @@ func (b *barrierService) probeKeysOne(ctx context.Context, m member, keys []*sch var missing []*schemav1.SchemaKey for chunkStart := 0; chunkStart < len(keys); chunkStart += barrierKeyChunkSize { - chunkEnd := chunkStart + barrierKeyChunkSize - if chunkEnd > len(keys) { - chunkEnd = len(keys) - } + chunkEnd := min(chunkStart+barrierKeyChunkSize, len(keys)) chunkKeys := keys[chunkStart:chunkEnd] resp, rpcErr := statusClient.GetKeyRevisions(ctx, &clusterv1.GetKeyRevisionsRequest{Keys: chunkKeys}) if rpcErr != nil { @@ -578,6 +651,17 @@ func (b *barrierService) awaitSchemaDeletedCluster(ctx context.Context, req *sch if allKeysAbsent(lastResults) { return &schemav1.AwaitSchemaDeletedResponse{Applied: true, Laggards: evictedLaggards}, nil } + // Post-probe membership refresh — see awaitRevisionAppliedCluster. + // A member that flips Active → Evictable after GetAbsentKeys + // finishes must surface as an eviction laggard, not as a + // still-present-keys laggard. + view = b.currentMembership() + alive, newlyEvicted = applyTransitionsApplied(alive, view) + evictedLaggards = append(evictedLaggards, newlyEvicted...) + lastResults = pruneStillPresentResults(lastResults, aliveSet(alive)) + if len(alive) == 0 || allKeysAbsent(lastResults) { + return &schemav1.AwaitSchemaDeletedResponse{Applied: true, Laggards: evictedLaggards}, nil + } if time.Now().After(deadline) { return &schemav1.AwaitSchemaDeletedResponse{ Applied: false, @@ -656,10 +740,7 @@ func (b *barrierService) probeAbsentOne(ctx context.Context, m member, keys []*s var present []*schemav1.SchemaKey for chunkStart := 0; chunkStart < len(keys); chunkStart += barrierKeyChunkSize { - chunkEnd := chunkStart + barrierKeyChunkSize - if chunkEnd > len(keys) { - chunkEnd = len(keys) - } + chunkEnd := min(chunkStart+barrierKeyChunkSize, len(keys)) chunkKeys := keys[chunkStart:chunkEnd] resp, rpcErr := statusClient.GetAbsentKeys(ctx, &clusterv1.GetAbsentKeysRequest{Keys: chunkKeys}) if rpcErr != nil { diff --git a/banyand/liaison/grpc/barrier_cluster_test.go b/banyand/liaison/grpc/barrier_cluster_test.go index ce98f7c79..bea9a9c42 100644 --- a/banyand/liaison/grpc/barrier_cluster_test.go +++ b/banyand/liaison/grpc/barrier_cluster_test.go @@ -829,3 +829,184 @@ func TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey(t *testing.T) "every member's laggard entry must list k0 as still present") } } + +// TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue verifies the +// post-probe membership refresh's `len(alive) == 0` early-exit: if every +// frozen member transitions to Evictable mid-call, the barrier surfaces the +// eviction notices and returns Applied=true (the cluster has no remaining +// objection to the target revision). Driven without a self entry so the +// alive set is purely the two data nodes. +func TestFanOut_AllNodesEvictedMidCall_ReturnsAppliedTrue(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + tier2 := &fakeQueueClient{ + routeTableFn: mutatingRouteTable( + &databasev1.RouteTable{Active: []string{"d1", "d2"}}, + &databasev1.RouteTable{Evictable: []string{"d1", "d2"}}, // both flip + ), + statusClients: map[string]clusterv1.NodeSchemaStatusServiceClient{ + "d1": &fakeNodeStatusClient{maxRev: 50}, // both behind initially + "d2": &fakeNodeStatusClient{maxRev: 50}, + }, + } + svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), tier2: tier2, self: ""}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(200 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), + "every frozen member evicted → cluster has no objection → applied=true") + require.Len(t, resp.GetLaggards(), 2, "both members must surface as eviction laggards") + for _, l := range resp.GetLaggards() { + assert.Equal(t, "evicted_during_poll", l.GetReason()) + } +} + +// TestFanOut_AllPeersUnimplemented_AllReady locks the cross-version policy +// when every peer answers Unimplemented (a fully Phase-1 fleet under a +// v0.13 liaison). The single-peer case is covered by +// TestFanOut_NodeReturnsUnimplemented_TreatedAsReady; this test extends to +// multiple peers + a data node so the "all-Unimplemented" branch is +// regression-locked. +func TestFanOut_AllPeersUnimplemented_AllReady(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + legacy := func() *fakeNodeStatusClient { + return &fakeNodeStatusClient{err: status.Error(codes.Unimplemented, "phase 1 node")} + } + tier1 := newFakeTier([]string{"peer-l1", "peer-l2"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "peer-l1": legacy(), + "peer-l2": legacy(), + }) + tier2 := newFakeTier([]string{"phase1-data"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "phase1-data": legacy(), + }) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(50 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), + "every peer Unimplemented + self ready → applied=true (Phase-1 fleet treated as ready)") + assert.Empty(t, resp.GetLaggards()) +} + +// TestFanOut_NilRouteTable_FallsBackToSelf verifies that when both tier +// queue clients return a nil *RouteTable from GetRouteTable, the snapshot +// degenerates to {self} and the call converges via the in-process probe +// without trying to dial any peer. +func TestFanOut_NilRouteTable_FallsBackToSelf(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + tier1 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable { return nil }} + tier2 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable { return nil }} + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(50 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), "nil route tables + self ready → applied=true") + assert.Empty(t, resp.GetLaggards()) +} + +// TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable verifies that when +// both tier clients return nil AND the receiving liaison has no curNode +// (test-context fixture, ContextNodeKey unset), the call fails fast with +// codes.Unavailable rather than blocking on an empty watched set. +func TestFanOut_NilRouteTable_NoSelf_ReturnsUnavailable(t *testing.T) { + tier1 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable { return nil }} + tier2 := &fakeQueueClient{routeTableFn: func() *databasev1.RouteTable { return nil }} + svc := (&clusterFixture{cache: &staticBarrierCache{}, tier1: tier1, tier2: tier2, self: ""}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 1, + Timeout: durationpb.New(20 * time.Millisecond), + }) + require.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, codes.Unavailable, status.Code(err)) +} + +// TestFanOut_MemberMissingFromStatusClients verifies that a member listed +// in tier Active but absent from the per-tier statusClients map (i.e. the +// borrowed *grpc.ClientConn is unavailable for that node) is treated as a +// transient laggard for this iteration only — same path as a generic +// gRPC error — and surfaces in the timeout response. +func TestFanOut_MemberMissingFromStatusClients(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + tier2 := newFakeTier([]string{"missing-conn"}, nil) // active but no client + svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), tier2: tier2, self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(60 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied(), + "member without a connection client must block convergence until timeout") + require.Len(t, resp.GetLaggards(), 1) + assert.Equal(t, "data-missing-conn", resp.GetLaggards()[0].GetNode()) +} + +// TestAwaitSchemaApplied_ChunkingBoundaries pins the per-peer chunking +// math at boundaries that the existing 1500-key test doesn't cover: +// - 999 keys → 1 chunk (just under the threshold). +// - 1000 keys → 1 chunk (exact boundary; second iteration must NOT fire). +// - 1001 keys → 2 chunks (smallest second chunk). +// - 2000 keys → 2 chunks (exact double). +// +// Each subtest verifies (a) the GetKeyRevisions call count per peer per +// iteration matches the expected chunk count, and (b) every key shows up +// exactly once in the aggregated missing_keys laggard. The peer state +// reports every key absent so the call times out and the aggregated +// missing_keys is checkable end-to-end. +func TestAwaitSchemaApplied_ChunkingBoundaries(t *testing.T) { + cases := []struct { + name string + keys int + expectedChunks int32 + }{ + {"BelowThreshold", 999, 1}, + {"ExactBoundary", 1000, 1}, + {"OneOverThreshold", 1001, 2}, + {"ExactDouble", 2000, 2}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + keys := make([]*schemav1.SchemaKey, tc.keys) + for i := range tc.keys { + keys[i] = streamKey("g", "k"+strconv.Itoa(i)) + } + cache := &staticBarrierCache{maxModRevision: 100} // self has no keys → also missing + var calls int32 + peer := &fakeNodeStatusClient{ + keyState: map[string]fakeKeyState{}, // peer has nothing → every key missing + keyRevsCalls: &calls, + } + tier1 := newFakeTier([]string{"peer"}, map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer}) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitSchemaApplied(context.Background(), &schemav1.AwaitSchemaAppliedRequest{ + Keys: keys, + Timeout: durationpb.New(40 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + + final := atomic.LoadInt32(&calls) + assert.Equal(t, int32(0), final%tc.expectedChunks, + "call count must be a multiple of expected chunks per iteration (got %d, expected_per_iter=%d)", + final, tc.expectedChunks) + assert.GreaterOrEqual(t, final, tc.expectedChunks, + "at least one full iteration of chunks must have run") + + la := laggardByNode(resp.GetLaggards(), "liaison-peer") + require.NotNil(t, la, "peer must be a laggard") + assert.Len(t, la.GetMissingKeys(), tc.keys, + "missing_keys must aggregate every key across chunks (no double-count, no drop)") + }) + } +}
