This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e552d181d3001a73685ada447b67bd1347950c05
Author: Hongtao Gao <[email protected]>
AuthorDate: Mon May 4 06:19:46 2026 +0000

    feat(barrier): mid-call eviction / leave / late-join handling (Phase 2 
§SS-1..SS-4)
    
    Phase 2 Step 2.2 snapshot-semantics: extend the cluster fan-out from #1111
    with the four frozen-snapshot rules from the plan §5.0 sequence diagram.
    Implements stories SS-1, SS-2, SS-3, SS-4 toward CP-5.
    
    SS-1 — NodeLaggard.reason proto field
    
      api/proto/banyandb/schema/v1/barrier.proto adds `string reason = 5;` to
      NodeLaggard, with a docstring covering the `evicted_during_poll` use case.
      Additive proto change — no wire-level breakage. docs/api-reference.md
      regen reflects the new field; barrier.pb.go is gitignored and regens on
      every build.
    
    SS-2 — Mid-call eviction (Active → Evictable)
    
      barrier_cluster.go now re-reads tier1 + tier2 route tables each iteration
      via currentMembership() and walks the alive set against the new view in
      applyTransitions(). Members that transitioned to Evictable are dropped
      from subsequent probes and recorded once in evictedLaggards with
      Reason="evicted_during_poll" carrying the last-observed mod_revision
      (sourced from lastRev[name], populated each iteration after probing).
    
      evictedLaggards accumulate across iterations and are surfaced regardless
      of overall outcome — applied=true responses include the eviction notices
      alongside an empty `revisionLaggards`, so callers can see what happened
      to the cluster mid-call.
    
    SS-3 — Mid-call leave (Active → Removed)
    
      applyTransitions drops members whose name disappears from BOTH active
      and evictable sets, without recording a laggard. Per the plan: a node
      that's gone from the cluster shouldn't block a barrier on its absence.
    
    SS-4 — Late-join exclusion
    
      The frozen `members` slice from snapshotMembers() is the loop's authority.
      applyTransitions never grows the alive set; nodes that enter Active after
      the snapshot are simply ignored. Late joiners with max_revision=0 don't
      cause spurious timeouts; they show up in subsequent calls.
    
    Tests (barrier_cluster_test.go):
    
    - fakeNodeStatusClient grows a `revs []int64` field so a fake can return
      different revisions per probe (call k → revs[min(k,len-1)]) — needed to
      simulate a member catching up across iterations.
    - fakeQueueClient grows a `routeTableFn func()` override so a frozen-
      snapshot test can mutate cluster membership between iterations
      deterministically (counter-based, no goroutine racing).
    - mutatingRouteTable helper returns `first` for the first two
      GetRouteTable calls (snapshot + iter 1) and `rest` thereafter — matches
      the production assumption that eviction follows at least one probe of
      the member.
    - TestFanOut_NodeEvictedMidWait_DropsAndAnnotates: peer-A starts in Active
      at rev=50, transitions to Evictable on iter 2; barrier returns 
Applied=true
      with one laggard {Node="liaison-peer-A", Reason="evicted_during_poll",
      CurrentModRevision=50}.
    - TestFanOut_NodeSetChangesMidWait_SkipsDepartedNodes: peer-A simply
      vanishes from the route table on iter 2; barrier converges with no
      laggard for it.
    - TestFanOut_LateJoiner_Excluded: peer-B joins Active on iter 2 but is
      not in the frozen snapshot; barrier converges based on peer-A only and
      peer-B is never probed (its absence from statusClients would have
      surfaced as a laggard if it had been).
    
    All 11 fanout unit tests pass; existing barrier_test.go tests (Phase 1
    single-cache path) remain green; lint clean.
    
    via [HAPI](https://hapi.run)
---
 api/proto/banyandb/schema/v1/barrier.proto   |  10 +-
 banyand/liaison/grpc/barrier_cluster.go      | 117 +++++++++++++++++++++--
 banyand/liaison/grpc/barrier_cluster_test.go | 138 ++++++++++++++++++++++++++-
 docs/api-reference.md                        |  10 +-
 4 files changed, 259 insertions(+), 16 deletions(-)

diff --git a/api/proto/banyandb/schema/v1/barrier.proto 
b/api/proto/banyandb/schema/v1/barrier.proto
index 8150d3051..5821fdecc 100644
--- a/api/proto/banyandb/schema/v1/barrier.proto
+++ b/api/proto/banyandb/schema/v1/barrier.proto
@@ -55,14 +55,18 @@ message AwaitRevisionAppliedRequest {
   google.protobuf.Duration timeout = 2;
 }
 
-// NodeLaggard reports a single data node that has not caught up to the
-// requested schema state. missing_keys is populated by AwaitSchemaApplied
-// responses; still_present_keys is populated by AwaitSchemaDeleted responses.
+// NodeLaggard reports a single cluster member (peer liaison or data node)
+// that has not caught up to the requested schema state. missing_keys is
+// populated by AwaitSchemaApplied responses; still_present_keys by
+// AwaitSchemaDeleted responses. reason is set when the laggard exists for a
+// non-default cause (e.g. "evicted_during_poll" when the cluster transitioned
+// the member from Active to Evictable mid-call); empty otherwise.
 message NodeLaggard {
   string node = 1;
   int64 current_mod_revision = 2;
   repeated SchemaKey missing_keys = 3;
   repeated SchemaKey still_present_keys = 4;
+  string reason = 5;
 }
 
 // AwaitRevisionAppliedResponse reports whether every node reached the target
diff --git a/banyand/liaison/grpc/barrier_cluster.go 
b/banyand/liaison/grpc/barrier_cluster.go
index 8cfab2938..3cbbae78e 100644
--- a/banyand/liaison/grpc/barrier_cluster.go
+++ b/banyand/liaison/grpc/barrier_cluster.go
@@ -124,13 +124,98 @@ type probeResult struct {
        ready  bool
 }
 
+// evictedLaggardReason is the laggard.reason value attached to a member that
+// transitioned Active → Evictable while a barrier call was in flight. The
+// cluster has already decided the node is unreliable; the barrier defers to
+// that decision rather than try to override it, but surfaces the eviction
+// once so callers see why the watched set shrank mid-call.
+const evictedLaggardReason = "evicted_during_poll"
+
+// membershipView is a snapshot of the tier1 + tier2 route tables collapsed
+// into name-keyed sets. The barrier loop refreshes this each iteration to
+// detect Active → Evictable / Active → Removed transitions on the frozen
+// member set built at call start.
+type membershipView struct {
+       active    map[string]struct{}
+       evictable map[string]struct{}
+}
+
+// currentMembership reads tier1 and tier2 route tables and merges their
+// Active and Evictable lists into name sets. Both providers contribute to
+// both sets — dedup is by name, so a hybrid host appearing in both tiers
+// counts once. The connection-pool layer guarantees that GetRouteTable()
+// returns a copy, so this read is safe to call concurrently without locking.
+func (b *barrierService) currentMembership() membershipView {
+       view := membershipView{
+               active:    map[string]struct{}{},
+               evictable: map[string]struct{}{},
+       }
+       addFromTier := func(provider func() queue.Client) {
+               if provider == nil {
+                       return
+               }
+               client := provider()
+               if client == nil {
+                       return
+               }
+               rt := client.GetRouteTable()
+               if rt == nil {
+                       return
+               }
+               for _, n := range rt.GetActive() {
+                       view.active[n] = struct{}{}
+               }
+               for _, n := range rt.GetEvictable() {
+                       view.evictable[n] = struct{}{}
+               }
+       }
+       addFromTier(b.peerLiaisons)
+       addFromTier(b.dataNodes)
+       return view
+}
+
+// applyTransitions walks the alive members against the current membership
+// view and partitions them into "still alive" (kept for the next probe) and
+// "newly evicted" (surfaced as one-time laggards with 
reason="evicted_during_poll").
+// Members whose names disappear from BOTH active and evictable are treated
+// as having left the cluster (graceful leave) and are dropped without a
+// laggard entry, per the frozen-snapshot policy. Self is always kept — the
+// in-process member never transitions.
+func applyTransitions(alive []member, view membershipView, lastRev 
map[string]int64) (kept []member, newlyEvicted []*schemav1.NodeLaggard) {
+       for _, m := range alive {
+               if m.isSelf {
+                       kept = append(kept, m)
+                       continue
+               }
+               if _, evicted := view.evictable[m.name]; evicted {
+                       newlyEvicted = append(newlyEvicted, 
&schemav1.NodeLaggard{
+                               Node:               m.laggardName(),
+                               CurrentModRevision: lastRev[m.name],
+                               Reason:             evictedLaggardReason,
+                       })
+                       continue
+               }
+               if _, active := view.active[m.name]; !active {
+                       // Absent from both sets: graceful leave. Drop silently 
per
+                       // the frozen-snapshot leave-during-poll branch.
+                       continue
+               }
+               kept = append(kept, m)
+       }
+       return kept, newlyEvicted
+}
+
 // awaitRevisionAppliedCluster runs the cluster-wide fan-out for
 // AwaitRevisionApplied. The receiving liaison's own cache is probed
 // in-process; peers are probed via clusterv1.NodeSchemaStatusService over the
-// *grpc.ClientConn borrowed from queue.Client.
+// *grpc.ClientConn borrowed from queue.Client. The frozen watched set built
+// at call start is shrunk mid-call when members transition Active → Evictable
+// (recorded as one-time `evicted_during_poll` laggards) or Active → Removed
+// (silent leave). Late joiners — nodes that enter Active after the snapshot
+// — are excluded; they show up in subsequent calls.
 func (b *barrierService) awaitRevisionAppliedCluster(ctx context.Context, req 
*schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, 
error) {
-       members := b.snapshotMembers()
-       if len(members) == 0 {
+       frozen := b.snapshotMembers()
+       if len(frozen) == 0 {
                return nil, status.Errorf(codes.Unavailable, "no active cluster 
members")
        }
 
@@ -138,17 +223,35 @@ func (b *barrierService) awaitRevisionAppliedCluster(ctx 
context.Context, req *s
        pollCtx, cancel := context.WithDeadline(ctx, deadline)
        defer cancel()
 
+       alive := frozen
+       lastRev := map[string]int64{}
+       var evictedLaggards []*schemav1.NodeLaggard
        interval := barrierInitInterval
        var lastResults []probeResult
        for {
-               lastResults = b.probeMembers(pollCtx, members, 
req.GetMinRevision(), deadline)
+               view := b.currentMembership()
+               var newlyEvicted []*schemav1.NodeLaggard
+               alive, newlyEvicted = applyTransitions(alive, view, lastRev)
+               evictedLaggards = append(evictedLaggards, newlyEvicted...)
+
+               if len(alive) == 0 {
+                       // Every frozen member departed mid-call. The remaining 
cluster
+                       // has no objection to the target revision; surface the 
eviction
+                       // notices and return applied=true so callers can act.
+                       return &schemav1.AwaitRevisionAppliedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
+               }
+
+               lastResults = b.probeMembers(pollCtx, alive, 
req.GetMinRevision(), deadline)
+               for _, r := range lastResults {
+                       lastRev[r.member.name] = r.rev
+               }
                if allReady(lastResults) {
-                       return &schemav1.AwaitRevisionAppliedResponse{Applied: 
true}, nil
+                       return &schemav1.AwaitRevisionAppliedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
                }
                if time.Now().After(deadline) {
                        return &schemav1.AwaitRevisionAppliedResponse{
                                Applied:  false,
-                               Laggards: revisionLaggards(lastResults),
+                               Laggards: append(revisionLaggards(lastResults), 
evictedLaggards...),
                        }, nil
                }
                select {
@@ -156,7 +259,7 @@ func (b *barrierService) awaitRevisionAppliedCluster(ctx 
context.Context, req *s
                case <-pollCtx.Done():
                        return &schemav1.AwaitRevisionAppliedResponse{
                                Applied:  false,
-                               Laggards: revisionLaggards(lastResults),
+                               Laggards: append(revisionLaggards(lastResults), 
evictedLaggards...),
                        }, nil
                }
                interval = barrierBackoff(interval)
diff --git a/banyand/liaison/grpc/barrier_cluster_test.go 
b/banyand/liaison/grpc/barrier_cluster_test.go
index 21a828d7c..adcad0921 100644
--- a/banyand/liaison/grpc/barrier_cluster_test.go
+++ b/banyand/liaison/grpc/barrier_cluster_test.go
@@ -40,17 +40,22 @@ import (
 
 // fakeNodeStatusClient is a minimal clusterv1.NodeSchemaStatusServiceClient.
 // Only GetMaxRevision is implemented for Phase 2.2 tests; the other RPCs
-// land in 2.3/2.4 and panic if a test accidentally invokes them.
+// land in 2.3/2.4 and panic if a test accidentally invokes them. revs lets
+// frozen-snapshot tests vary the returned revision per call (call k returns
+// revs[min(k, len-1)]) so the barrier can observe a member catching up
+// across iterations.
 type fakeNodeStatusClient struct {
        err      error
        callsRef *int32
+       revs     []int64
        maxRev   int64
        delay    time.Duration
 }
 
 func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _ 
*clusterv1.GetMaxRevisionRequest, _ ...grpc.CallOption) 
(*clusterv1.GetMaxRevisionResponse, error) {
+       n := int32(0)
        if f.callsRef != nil {
-               atomic.AddInt32(f.callsRef, 1)
+               n = atomic.AddInt32(f.callsRef, 1) - 1
        }
        if f.delay > 0 {
                select {
@@ -62,6 +67,13 @@ func (f *fakeNodeStatusClient) GetMaxRevision(ctx 
context.Context, _ *clusterv1.
        if f.err != nil {
                return nil, f.err
        }
+       if len(f.revs) > 0 {
+               idx := int(n)
+               if idx >= len(f.revs) {
+                       idx = len(f.revs) - 1
+               }
+               return &clusterv1.GetMaxRevisionResponse{MaxModRevision: 
f.revs[idx]}, nil
+       }
        return &clusterv1.GetMaxRevisionResponse{MaxModRevision: f.maxRev}, nil
 }
 
@@ -75,14 +87,19 @@ func (*fakeNodeStatusClient) GetAbsentKeys(_ 
context.Context, _ *clusterv1.GetAb
 
 // fakeQueueClient embeds queue.Client (a nil interface) so unused methods
 // panic at runtime; only the two methods the barrier fan-out actually calls
-// are overridden.
+// are overridden. routeTableFn (when set) overrides routeTable per call so
+// frozen-snapshot tests can mutate cluster membership between iterations.
 type fakeQueueClient struct {
        queue.Client
        routeTable    *databasev1.RouteTable
+       routeTableFn  func() *databasev1.RouteTable
        statusClients map[string]clusterv1.NodeSchemaStatusServiceClient
 }
 
 func (f *fakeQueueClient) GetRouteTable() *databasev1.RouteTable {
+       if f.routeTableFn != nil {
+               return f.routeTableFn()
+       }
        if f.routeTable == nil {
                return &databasev1.RouteTable{}
        }
@@ -319,3 +336,118 @@ func TestFanOut_NodeReturnsUnimplemented_TreatedAsReady(t 
*testing.T) {
                "Unimplemented from a Phase-1 peer must not block a v0.13 
barrier caller")
        assert.Empty(t, resp.GetLaggards())
 }
+
+// mutatingRouteTable returns a closure that produces `first` for the first
+// two GetRouteTable calls and `rest` for every call thereafter. The two-call
+// threshold lines up with the production sequence: snapshotMembers consumes
+// call 1 (initial freeze), iteration 1's currentMembership consumes call 2,
+// and iteration 2 onwards observes the mutated state. This means the first
+// probe round runs against `first` (so per-member revs are recorded into
+// lastRev) before the transition fires — matching the production assumption
+// that eviction follows at least one probe of the member.
+func mutatingRouteTable(first, rest *databasev1.RouteTable) func() 
*databasev1.RouteTable {
+       var calls atomic.Int32
+       return func() *databasev1.RouteTable {
+               if calls.Add(1) <= 2 {
+                       return first
+               }
+               return rest
+       }
+}
+
+// TestFanOut_NodeEvictedMidWait_DropsAndAnnotates verifies that when a member
+// transitions Active → Evictable mid-call, the barrier drops it from
+// subsequent probes, records exactly one laggard with reason
+// "evicted_during_poll" carrying the last-observed mod_revision, and
+// converges based on the remaining members.
+func TestFanOut_NodeEvictedMidWait_DropsAndAnnotates(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       tier1 := &fakeQueueClient{
+               routeTableFn: mutatingRouteTable(
+                       &databasev1.RouteTable{Active: []string{"peer-A", 
"peer-B"}},
+                       &databasev1.RouteTable{Active: []string{"peer-B"}, 
Evictable: []string{"peer-A"}},
+               ),
+               statusClients: 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+                       "peer-A": &fakeNodeStatusClient{maxRev: 50},  // 
permanently behind
+                       "peer-B": &fakeNodeStatusClient{maxRev: 100}, // 
already caught up
+               },
+       }
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(200 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(),
+               "barrier should converge once peer-A is evicted (peer-B + self 
both at 100)")
+       require.Len(t, resp.GetLaggards(), 1, "exactly one evicted laggard 
expected")
+       assert.Equal(t, "liaison-peer-A", resp.GetLaggards()[0].GetNode())
+       assert.Equal(t, "evicted_during_poll", 
resp.GetLaggards()[0].GetReason())
+       assert.Equal(t, int64(50), 
resp.GetLaggards()[0].GetCurrentModRevision(),
+               "laggard should carry the last-observed revision before 
eviction")
+}
+
+// TestFanOut_NodeSetChangesMidWait_SkipsDepartedNodes verifies that when a
+// member's name disappears from BOTH active and evictable mid-call (graceful
+// leave), the barrier drops it silently — no laggard entry — and converges
+// on the remaining members.
+func TestFanOut_NodeSetChangesMidWait_SkipsDepartedNodes(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       tier1 := &fakeQueueClient{
+               routeTableFn: mutatingRouteTable(
+                       &databasev1.RouteTable{Active: []string{"peer-A", 
"peer-B"}},
+                       &databasev1.RouteTable{Active: []string{"peer-B"}}, // 
peer-A simply gone
+               ),
+               statusClients: 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+                       "peer-A": &fakeNodeStatusClient{maxRev: 50}, // would 
block if probed
+                       "peer-B": &fakeNodeStatusClient{maxRev: 100},
+               },
+       }
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(200 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(),
+               "barrier should converge once peer-A leaves (peer-B + self both 
at 100)")
+       assert.Empty(t, resp.GetLaggards(),
+               "a graceful leave (Active → Removed) should not produce a 
laggard entry")
+}
+
+// TestFanOut_LateJoiner_Excluded verifies that nodes entering Active after
+// the call's initial snapshot are NOT added to the watched set. A late
+// joiner with max_revision=0 must not cause spurious timeouts; barrier
+// returns Applied=true once the original watched set is ready.
+//
+// peer-A starts behind (rev=50) and catches up to 100 on its second probe;
+// peer-B is the late joiner — deliberately omitted from statusClients so
+// any erroneous probe attempt would surface as a laggard rather than a
+// silent pass.
+func TestFanOut_LateJoiner_Excluded(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       var peerACalls int32
+       tier1 := &fakeQueueClient{
+               routeTableFn: mutatingRouteTable(
+                       &databasev1.RouteTable{Active: []string{"peer-A"}},
+                       &databasev1.RouteTable{Active: []string{"peer-A", 
"peer-B"}}, // peer-B joins
+               ),
+               statusClients: 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+                       "peer-A": &fakeNodeStatusClient{revs: []int64{50, 100}, 
callsRef: &peerACalls},
+                       // peer-B intentionally absent — any probe would error.
+               },
+       }
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(200 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(),
+               "frozen snapshot should converge based on peer-A only; 
late-joiner peer-B is ignored")
+       assert.Empty(t, resp.GetLaggards(),
+               "laggards list must not contain the late joiner peer-B")
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 631bd679c..c69e1e60d 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2165,9 +2165,12 @@ requested keys.
 <a name="banyandb-schema-v1-NodeLaggard"></a>
 
 ### NodeLaggard
-NodeLaggard reports a single data node that has not caught up to the
-requested schema state. missing_keys is populated by AwaitSchemaApplied
-responses; still_present_keys is populated by AwaitSchemaDeleted responses.
+NodeLaggard reports a single cluster member (peer liaison or data node)
+that has not caught up to the requested schema state. missing_keys is
+populated by AwaitSchemaApplied responses; still_present_keys by
+AwaitSchemaDeleted responses. reason is set when the laggard exists for a
+non-default cause (e.g. &#34;evicted_during_poll&#34; when the cluster 
transitioned
+the member from Active to Evictable mid-call); empty otherwise.
 
 
 | Field | Type | Label | Description |
@@ -2176,6 +2179,7 @@ responses; still_present_keys is populated by 
AwaitSchemaDeleted responses.
 | current_mod_revision | [int64](#int64) |  |  |
 | missing_keys | [SchemaKey](#banyandb-schema-v1-SchemaKey) | repeated |  |
 | still_present_keys | [SchemaKey](#banyandb-schema-v1-SchemaKey) | repeated | 
 |
+| reason | [string](#string) |  |  |
 
 
 

Reply via email to