This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e552d181d3001a73685ada447b67bd1347950c05 Author: Hongtao Gao <[email protected]> AuthorDate: Mon May 4 06:19:46 2026 +0000 feat(barrier): mid-call eviction / leave / late-join handling (Phase 2 §SS-1..SS-4) Phase 2 Step 2.2 snapshot-semantics: extend the cluster fan-out from #1111 with the four frozen-snapshot rules from the plan §5.0 sequence diagram. Implements stories SS-1, SS-2, SS-3, SS-4 toward CP-5. SS-1 — NodeLaggard.reason proto field api/proto/banyandb/schema/v1/barrier.proto adds `string reason = 5;` to NodeLaggard, with a docstring covering the `evicted_during_poll` use case. Additive proto change — no wire-level breakage. docs/api-reference.md regen reflects the new field; barrier.pb.go is gitignored and regens on every build. SS-2 — Mid-call eviction (Active → Evictable) barrier_cluster.go now re-reads tier1 + tier2 route tables each iteration via currentMembership() and walks the alive set against the new view in applyTransitions(). Members that transitioned to Evictable are dropped from subsequent probes and recorded once in evictedLaggards with Reason="evicted_during_poll" carrying the last-observed mod_revision (sourced from lastRev[name], populated each iteration after probing). evictedLaggards accumulate across iterations and are surfaced regardless of overall outcome — applied=true responses include the eviction notices alongside an empty `revisionLaggards`, so callers can see what happened to the cluster mid-call. SS-3 — Mid-call leave (Active → Removed) applyTransitions drops members whose name disappears from BOTH active and evictable sets, without recording a laggard. Per the plan: a node that's gone from the cluster shouldn't block a barrier on its absence. SS-4 — Late-join exclusion The frozen `members` slice from snapshotMembers() is the loop's authority. applyTransitions never grows the alive set; nodes that enter Active after the snapshot are simply ignored. Late joiners with max_revision=0 don't cause spurious timeouts; they show up in subsequent calls. Tests (barrier_cluster_test.go): - fakeNodeStatusClient grows a `revs []int64` field so a fake can return different revisions per probe (call k → revs[min(k,len-1)]) — needed to simulate a member catching up across iterations. - fakeQueueClient grows a `routeTableFn func()` override so a frozen- snapshot test can mutate cluster membership between iterations deterministically (counter-based, no goroutine racing). - mutatingRouteTable helper returns `first` for the first two GetRouteTable calls (snapshot + iter 1) and `rest` thereafter — matches the production assumption that eviction follows at least one probe of the member. - TestFanOut_NodeEvictedMidWait_DropsAndAnnotates: peer-A starts in Active at rev=50, transitions to Evictable on iter 2; barrier returns Applied=true with one laggard {Node="liaison-peer-A", Reason="evicted_during_poll", CurrentModRevision=50}. - TestFanOut_NodeSetChangesMidWait_SkipsDepartedNodes: peer-A simply vanishes from the route table on iter 2; barrier converges with no laggard for it. - TestFanOut_LateJoiner_Excluded: peer-B joins Active on iter 2 but is not in the frozen snapshot; barrier converges based on peer-A only and peer-B is never probed (its absence from statusClients would have surfaced as a laggard if it had been). All 11 fanout unit tests pass; existing barrier_test.go tests (Phase 1 single-cache path) remain green; lint clean. via [HAPI](https://hapi.run) --- api/proto/banyandb/schema/v1/barrier.proto | 10 +- banyand/liaison/grpc/barrier_cluster.go | 117 +++++++++++++++++++++-- banyand/liaison/grpc/barrier_cluster_test.go | 138 ++++++++++++++++++++++++++- docs/api-reference.md | 10 +- 4 files changed, 259 insertions(+), 16 deletions(-) diff --git a/api/proto/banyandb/schema/v1/barrier.proto b/api/proto/banyandb/schema/v1/barrier.proto index 8150d3051..5821fdecc 100644 --- a/api/proto/banyandb/schema/v1/barrier.proto +++ b/api/proto/banyandb/schema/v1/barrier.proto @@ -55,14 +55,18 @@ message AwaitRevisionAppliedRequest { google.protobuf.Duration timeout = 2; } -// NodeLaggard reports a single data node that has not caught up to the -// requested schema state. missing_keys is populated by AwaitSchemaApplied -// responses; still_present_keys is populated by AwaitSchemaDeleted responses. +// NodeLaggard reports a single cluster member (peer liaison or data node) +// that has not caught up to the requested schema state. missing_keys is +// populated by AwaitSchemaApplied responses; still_present_keys by +// AwaitSchemaDeleted responses. reason is set when the laggard exists for a +// non-default cause (e.g. "evicted_during_poll" when the cluster transitioned +// the member from Active to Evictable mid-call); empty otherwise. message NodeLaggard { string node = 1; int64 current_mod_revision = 2; repeated SchemaKey missing_keys = 3; repeated SchemaKey still_present_keys = 4; + string reason = 5; } // AwaitRevisionAppliedResponse reports whether every node reached the target diff --git a/banyand/liaison/grpc/barrier_cluster.go b/banyand/liaison/grpc/barrier_cluster.go index 8cfab2938..3cbbae78e 100644 --- a/banyand/liaison/grpc/barrier_cluster.go +++ b/banyand/liaison/grpc/barrier_cluster.go @@ -124,13 +124,98 @@ type probeResult struct { ready bool } +// evictedLaggardReason is the laggard.reason value attached to a member that +// transitioned Active → Evictable while a barrier call was in flight. The +// cluster has already decided the node is unreliable; the barrier defers to +// that decision rather than try to override it, but surfaces the eviction +// once so callers see why the watched set shrank mid-call. +const evictedLaggardReason = "evicted_during_poll" + +// membershipView is a snapshot of the tier1 + tier2 route tables collapsed +// into name-keyed sets. The barrier loop refreshes this each iteration to +// detect Active → Evictable / Active → Removed transitions on the frozen +// member set built at call start. +type membershipView struct { + active map[string]struct{} + evictable map[string]struct{} +} + +// currentMembership reads tier1 and tier2 route tables and merges their +// Active and Evictable lists into name sets. Both providers contribute to +// both sets — dedup is by name, so a hybrid host appearing in both tiers +// counts once. The connection-pool layer guarantees that GetRouteTable() +// returns a copy, so this read is safe to call concurrently without locking. +func (b *barrierService) currentMembership() membershipView { + view := membershipView{ + active: map[string]struct{}{}, + evictable: map[string]struct{}{}, + } + addFromTier := func(provider func() queue.Client) { + if provider == nil { + return + } + client := provider() + if client == nil { + return + } + rt := client.GetRouteTable() + if rt == nil { + return + } + for _, n := range rt.GetActive() { + view.active[n] = struct{}{} + } + for _, n := range rt.GetEvictable() { + view.evictable[n] = struct{}{} + } + } + addFromTier(b.peerLiaisons) + addFromTier(b.dataNodes) + return view +} + +// applyTransitions walks the alive members against the current membership +// view and partitions them into "still alive" (kept for the next probe) and +// "newly evicted" (surfaced as one-time laggards with reason="evicted_during_poll"). +// Members whose names disappear from BOTH active and evictable are treated +// as having left the cluster (graceful leave) and are dropped without a +// laggard entry, per the frozen-snapshot policy. Self is always kept — the +// in-process member never transitions. +func applyTransitions(alive []member, view membershipView, lastRev map[string]int64) (kept []member, newlyEvicted []*schemav1.NodeLaggard) { + for _, m := range alive { + if m.isSelf { + kept = append(kept, m) + continue + } + if _, evicted := view.evictable[m.name]; evicted { + newlyEvicted = append(newlyEvicted, &schemav1.NodeLaggard{ + Node: m.laggardName(), + CurrentModRevision: lastRev[m.name], + Reason: evictedLaggardReason, + }) + continue + } + if _, active := view.active[m.name]; !active { + // Absent from both sets: graceful leave. Drop silently per + // the frozen-snapshot leave-during-poll branch. + continue + } + kept = append(kept, m) + } + return kept, newlyEvicted +} + // awaitRevisionAppliedCluster runs the cluster-wide fan-out for // AwaitRevisionApplied. The receiving liaison's own cache is probed // in-process; peers are probed via clusterv1.NodeSchemaStatusService over the -// *grpc.ClientConn borrowed from queue.Client. +// *grpc.ClientConn borrowed from queue.Client. The frozen watched set built +// at call start is shrunk mid-call when members transition Active → Evictable +// (recorded as one-time `evicted_during_poll` laggards) or Active → Removed +// (silent leave). Late joiners — nodes that enter Active after the snapshot +// — are excluded; they show up in subsequent calls. func (b *barrierService) awaitRevisionAppliedCluster(ctx context.Context, req *schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, error) { - members := b.snapshotMembers() - if len(members) == 0 { + frozen := b.snapshotMembers() + if len(frozen) == 0 { return nil, status.Errorf(codes.Unavailable, "no active cluster members") } @@ -138,17 +223,35 @@ func (b *barrierService) awaitRevisionAppliedCluster(ctx context.Context, req *s pollCtx, cancel := context.WithDeadline(ctx, deadline) defer cancel() + alive := frozen + lastRev := map[string]int64{} + var evictedLaggards []*schemav1.NodeLaggard interval := barrierInitInterval var lastResults []probeResult for { - lastResults = b.probeMembers(pollCtx, members, req.GetMinRevision(), deadline) + view := b.currentMembership() + var newlyEvicted []*schemav1.NodeLaggard + alive, newlyEvicted = applyTransitions(alive, view, lastRev) + evictedLaggards = append(evictedLaggards, newlyEvicted...) + + if len(alive) == 0 { + // Every frozen member departed mid-call. The remaining cluster + // has no objection to the target revision; surface the eviction + // notices and return applied=true so callers can act. + return &schemav1.AwaitRevisionAppliedResponse{Applied: true, Laggards: evictedLaggards}, nil + } + + lastResults = b.probeMembers(pollCtx, alive, req.GetMinRevision(), deadline) + for _, r := range lastResults { + lastRev[r.member.name] = r.rev + } if allReady(lastResults) { - return &schemav1.AwaitRevisionAppliedResponse{Applied: true}, nil + return &schemav1.AwaitRevisionAppliedResponse{Applied: true, Laggards: evictedLaggards}, nil } if time.Now().After(deadline) { return &schemav1.AwaitRevisionAppliedResponse{ Applied: false, - Laggards: revisionLaggards(lastResults), + Laggards: append(revisionLaggards(lastResults), evictedLaggards...), }, nil } select { @@ -156,7 +259,7 @@ func (b *barrierService) awaitRevisionAppliedCluster(ctx context.Context, req *s case <-pollCtx.Done(): return &schemav1.AwaitRevisionAppliedResponse{ Applied: false, - Laggards: revisionLaggards(lastResults), + Laggards: append(revisionLaggards(lastResults), evictedLaggards...), }, nil } interval = barrierBackoff(interval) diff --git a/banyand/liaison/grpc/barrier_cluster_test.go b/banyand/liaison/grpc/barrier_cluster_test.go index 21a828d7c..adcad0921 100644 --- a/banyand/liaison/grpc/barrier_cluster_test.go +++ b/banyand/liaison/grpc/barrier_cluster_test.go @@ -40,17 +40,22 @@ import ( // fakeNodeStatusClient is a minimal clusterv1.NodeSchemaStatusServiceClient. // Only GetMaxRevision is implemented for Phase 2.2 tests; the other RPCs -// land in 2.3/2.4 and panic if a test accidentally invokes them. +// land in 2.3/2.4 and panic if a test accidentally invokes them. revs lets +// frozen-snapshot tests vary the returned revision per call (call k returns +// revs[min(k, len-1)]) so the barrier can observe a member catching up +// across iterations. type fakeNodeStatusClient struct { err error callsRef *int32 + revs []int64 maxRev int64 delay time.Duration } func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _ *clusterv1.GetMaxRevisionRequest, _ ...grpc.CallOption) (*clusterv1.GetMaxRevisionResponse, error) { + n := int32(0) if f.callsRef != nil { - atomic.AddInt32(f.callsRef, 1) + n = atomic.AddInt32(f.callsRef, 1) - 1 } if f.delay > 0 { select { @@ -62,6 +67,13 @@ func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _ *clusterv1. if f.err != nil { return nil, f.err } + if len(f.revs) > 0 { + idx := int(n) + if idx >= len(f.revs) { + idx = len(f.revs) - 1 + } + return &clusterv1.GetMaxRevisionResponse{MaxModRevision: f.revs[idx]}, nil + } return &clusterv1.GetMaxRevisionResponse{MaxModRevision: f.maxRev}, nil } @@ -75,14 +87,19 @@ func (*fakeNodeStatusClient) GetAbsentKeys(_ context.Context, _ *clusterv1.GetAb // fakeQueueClient embeds queue.Client (a nil interface) so unused methods // panic at runtime; only the two methods the barrier fan-out actually calls -// are overridden. +// are overridden. routeTableFn (when set) overrides routeTable per call so +// frozen-snapshot tests can mutate cluster membership between iterations. type fakeQueueClient struct { queue.Client routeTable *databasev1.RouteTable + routeTableFn func() *databasev1.RouteTable statusClients map[string]clusterv1.NodeSchemaStatusServiceClient } func (f *fakeQueueClient) GetRouteTable() *databasev1.RouteTable { + if f.routeTableFn != nil { + return f.routeTableFn() + } if f.routeTable == nil { return &databasev1.RouteTable{} } @@ -319,3 +336,118 @@ func TestFanOut_NodeReturnsUnimplemented_TreatedAsReady(t *testing.T) { "Unimplemented from a Phase-1 peer must not block a v0.13 barrier caller") assert.Empty(t, resp.GetLaggards()) } + +// mutatingRouteTable returns a closure that produces `first` for the first +// two GetRouteTable calls and `rest` for every call thereafter. The two-call +// threshold lines up with the production sequence: snapshotMembers consumes +// call 1 (initial freeze), iteration 1's currentMembership consumes call 2, +// and iteration 2 onwards observes the mutated state. This means the first +// probe round runs against `first` (so per-member revs are recorded into +// lastRev) before the transition fires — matching the production assumption +// that eviction follows at least one probe of the member. +func mutatingRouteTable(first, rest *databasev1.RouteTable) func() *databasev1.RouteTable { + var calls atomic.Int32 + return func() *databasev1.RouteTable { + if calls.Add(1) <= 2 { + return first + } + return rest + } +} + +// TestFanOut_NodeEvictedMidWait_DropsAndAnnotates verifies that when a member +// transitions Active → Evictable mid-call, the barrier drops it from +// subsequent probes, records exactly one laggard with reason +// "evicted_during_poll" carrying the last-observed mod_revision, and +// converges based on the remaining members. +func TestFanOut_NodeEvictedMidWait_DropsAndAnnotates(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + tier1 := &fakeQueueClient{ + routeTableFn: mutatingRouteTable( + &databasev1.RouteTable{Active: []string{"peer-A", "peer-B"}}, + &databasev1.RouteTable{Active: []string{"peer-B"}, Evictable: []string{"peer-A"}}, + ), + statusClients: map[string]clusterv1.NodeSchemaStatusServiceClient{ + "peer-A": &fakeNodeStatusClient{maxRev: 50}, // permanently behind + "peer-B": &fakeNodeStatusClient{maxRev: 100}, // already caught up + }, + } + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(200 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), + "barrier should converge once peer-A is evicted (peer-B + self both at 100)") + require.Len(t, resp.GetLaggards(), 1, "exactly one evicted laggard expected") + assert.Equal(t, "liaison-peer-A", resp.GetLaggards()[0].GetNode()) + assert.Equal(t, "evicted_during_poll", resp.GetLaggards()[0].GetReason()) + assert.Equal(t, int64(50), resp.GetLaggards()[0].GetCurrentModRevision(), + "laggard should carry the last-observed revision before eviction") +} + +// TestFanOut_NodeSetChangesMidWait_SkipsDepartedNodes verifies that when a +// member's name disappears from BOTH active and evictable mid-call (graceful +// leave), the barrier drops it silently — no laggard entry — and converges +// on the remaining members. +func TestFanOut_NodeSetChangesMidWait_SkipsDepartedNodes(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + tier1 := &fakeQueueClient{ + routeTableFn: mutatingRouteTable( + &databasev1.RouteTable{Active: []string{"peer-A", "peer-B"}}, + &databasev1.RouteTable{Active: []string{"peer-B"}}, // peer-A simply gone + ), + statusClients: map[string]clusterv1.NodeSchemaStatusServiceClient{ + "peer-A": &fakeNodeStatusClient{maxRev: 50}, // would block if probed + "peer-B": &fakeNodeStatusClient{maxRev: 100}, + }, + } + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(200 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), + "barrier should converge once peer-A leaves (peer-B + self both at 100)") + assert.Empty(t, resp.GetLaggards(), + "a graceful leave (Active → Removed) should not produce a laggard entry") +} + +// TestFanOut_LateJoiner_Excluded verifies that nodes entering Active after +// the call's initial snapshot are NOT added to the watched set. A late +// joiner with max_revision=0 must not cause spurious timeouts; barrier +// returns Applied=true once the original watched set is ready. +// +// peer-A starts behind (rev=50) and catches up to 100 on its second probe; +// peer-B is the late joiner — deliberately omitted from statusClients so +// any erroneous probe attempt would surface as a laggard rather than a +// silent pass. +func TestFanOut_LateJoiner_Excluded(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + var peerACalls int32 + tier1 := &fakeQueueClient{ + routeTableFn: mutatingRouteTable( + &databasev1.RouteTable{Active: []string{"peer-A"}}, + &databasev1.RouteTable{Active: []string{"peer-A", "peer-B"}}, // peer-B joins + ), + statusClients: map[string]clusterv1.NodeSchemaStatusServiceClient{ + "peer-A": &fakeNodeStatusClient{revs: []int64{50, 100}, callsRef: &peerACalls}, + // peer-B intentionally absent — any probe would error. + }, + } + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(200 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), + "frozen snapshot should converge based on peer-A only; late-joiner peer-B is ignored") + assert.Empty(t, resp.GetLaggards(), + "laggards list must not contain the late joiner peer-B") +} diff --git a/docs/api-reference.md b/docs/api-reference.md index 631bd679c..c69e1e60d 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -2165,9 +2165,12 @@ requested keys. <a name="banyandb-schema-v1-NodeLaggard"></a> ### NodeLaggard -NodeLaggard reports a single data node that has not caught up to the -requested schema state. missing_keys is populated by AwaitSchemaApplied -responses; still_present_keys is populated by AwaitSchemaDeleted responses. +NodeLaggard reports a single cluster member (peer liaison or data node) +that has not caught up to the requested schema state. missing_keys is +populated by AwaitSchemaApplied responses; still_present_keys by +AwaitSchemaDeleted responses. reason is set when the laggard exists for a +non-default cause (e.g. "evicted_during_poll" when the cluster transitioned +the member from Active to Evictable mid-call); empty otherwise. | Field | Type | Label | Description | @@ -2176,6 +2179,7 @@ responses; still_present_keys is populated by AwaitSchemaDeleted responses. | current_mod_revision | [int64](#int64) | | | | missing_keys | [SchemaKey](#banyandb-schema-v1-SchemaKey) | repeated | | | still_present_keys | [SchemaKey](#banyandb-schema-v1-SchemaKey) | repeated | | +| reason | [string](#string) | | |
