This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 6731df5c0bb16b4e19f32a64eedd8727d0daf328
Author: Hongtao Gao <[email protected]>
AuthorDate: Mon May 4 06:32:38 2026 +0000

    feat(barrier): cluster fan-out for AwaitSchemaDeleted (Phase 2 §FD-1, §FD-2)
    
    Phase 2 Step 2.4 — extend the cluster barrier to the deletion path.
    AwaitSchemaDeleted now probes the frozen tier1 + tier2 + self watched set
    in parallel via GetAbsentKeys; the receiving liaison's own cache is read
    in-process. The §5.0 differences from AwaitSchemaApplied: per-member probe
    RPC (GetAbsentKeys instead of GetKeyRevisions) and decision rule
    ("every key absent on every member" instead of "present at or above
    target rev"). Membership snapshot, chunking, shared deadline, cross-version
    policy, and transient-error handling are identical and reuse the helpers
    introduced in §FA-1.
    
    Algorithm:
    
    - Per-iteration loop and applyTransitionsApplied are reused (no
      duplication of eviction / leave / late-join logic).
    - Per-member probe (probeAbsentOne): chunks keys at barrierKeyChunkSize=1000
      per RPC, aggregates StillPresentKeys across chunks. Self uses an
      in-process scan that mirrors Phase 1's collectPresentKeys.
    - Cross-version: codes.Unimplemented from a peer ⇒ ready (assume every
      key absent). Other gRPC errors ⇒ transient laggard for this iteration 
only.
    - Nil-cache fail-closed: self with nil cache reports every key still
      present so the loop keeps polling, mirroring the Phase 1 standalone
      contract.
    
    Dispatch: AwaitSchemaDeleted now checks peerLiaisons / dataNodes / selfName
    and routes to awaitSchemaDeletedCluster when wired (production); falls
    back to the legacy in-process loop otherwise.
    
    Tests (barrier_cluster_test.go):
    
    - fakeNodeStatusClient.GetAbsentKeys is now a real impl (the Phase 2.2
      panic stub is gone). Reuses fakeKeyState; absent partition is "key in
      state with present=true" → still_present, else absent.
    - A new keyStateFn closure on the fake lets tests vary state per call
      (mirroring the mutatingRouteTable pattern from §SS-2..§SS-4) — needed
      for the SucceedsAfterAllNodesDrain regression.
    - TestAwaitSchemaDeleted_FanOut_ReportsStillPresent: per-node still_present
      laggards with role-prefixed identifiers.
    - TestAwaitSchemaDeleted_FanOut_SucceedsAfterAllNodesDrain: peer transitions
      from "both keys present" on call 1 to "both absent" on call 2; barrier
      converges with applied=true and zero laggards once both nodes drain.
    - TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey: after
      delete + recreate at higher rev, every member's GetAbsentKeys reports
      the key still_present (the recreated row is alive); barrier returns
      applied=false with the key surfaced as still_present per member.
    
    All 22 cluster fan-out tests pass; existing barrier_test.go (Phase 1
    helpers + standalone path) stays green; lint clean.
    
    via [HAPI](https://hapi.run)
---
 banyand/liaison/grpc/barrier.go              |  10 +-
 banyand/liaison/grpc/barrier_cluster.go      | 172 +++++++++++++++++++++++++++
 banyand/liaison/grpc/barrier_cluster_test.go | 146 ++++++++++++++++++++++-
 3 files changed, 324 insertions(+), 4 deletions(-)

diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go
index ec152b8ec..c4a606736 100644
--- a/banyand/liaison/grpc/barrier.go
+++ b/banyand/liaison/grpc/barrier.go
@@ -176,9 +176,15 @@ func (b *barrierService) AwaitSchemaApplied(ctx 
context.Context, req *schemav1.A
        }
 }
 
-// AwaitSchemaDeleted blocks until all requested keys are absent from the 
cache,
-// or the timeout elapses.
+// AwaitSchemaDeleted blocks until all requested keys are absent from the
+// cache, or the timeout elapses. When the cluster fan-out dependencies are
+// wired (production), the call probes the frozen tier1 + tier2 + self
+// watched set in parallel via GetAbsentKeys; without them (Phase 1
+// unit-test path), it falls back to a single in-process cache poll.
 func (b *barrierService) AwaitSchemaDeleted(ctx context.Context, req 
*schemav1.AwaitSchemaDeletedRequest) (*schemav1.AwaitSchemaDeletedResponse, 
error) {
+       if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil {
+               return b.awaitSchemaDeletedCluster(ctx, req)
+       }
        if len(req.GetKeys()) > barrierMaxKeys {
                return nil, status.Errorf(codes.InvalidArgument, "too many 
keys: max=%d", barrierMaxKeys)
        }
diff --git a/banyand/liaison/grpc/barrier_cluster.go 
b/banyand/liaison/grpc/barrier_cluster.go
index 8b14410d1..6370fbc92 100644
--- a/banyand/liaison/grpc/barrier_cluster.go
+++ b/banyand/liaison/grpc/barrier_cluster.go
@@ -529,6 +529,178 @@ func (b *barrierService) probeKeysOne(ctx 
context.Context, m member, keys []*sch
        return keyProbeResult{member: m, missingKeys: missing, ready: 
len(missing) == 0}
 }
 
+// stillPresentResult is the per-iteration outcome of a GetAbsentKeys probe
+// for one member during AwaitSchemaDeleted. stillPresent carries the keys
+// the member's cache has not yet removed; ready=true when every requested
+// key is absent (stillPresent is empty).
+type stillPresentResult struct {
+       err          error
+       stillPresent []*schemav1.SchemaKey
+       member       member
+       ready        bool
+}
+
+// awaitSchemaDeletedCluster runs the cluster-wide fan-out for
+// AwaitSchemaDeleted. Per the §5.0 differences table the only deltas from
+// AwaitSchemaApplied are the per-member probe RPC (GetAbsentKeys instead of
+// GetKeyRevisions) and the decision rule (`every key absent on every member`
+// instead of `every key present at or above target rev`). Membership
+// snapshot, chunking, shared deadline, cross-version, and transient-error
+// policies are identical.
+func (b *barrierService) awaitSchemaDeletedCluster(ctx context.Context, req 
*schemav1.AwaitSchemaDeletedRequest) (*schemav1.AwaitSchemaDeletedResponse, 
error) {
+       if len(req.GetKeys()) > barrierMaxKeys {
+               return nil, status.Errorf(codes.InvalidArgument, "too many 
keys: max=%d", barrierMaxKeys)
+       }
+       frozen := b.snapshotMembers()
+       if len(frozen) == 0 {
+               return nil, status.Errorf(codes.Unavailable, "no active cluster 
members")
+       }
+
+       deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout()))
+       pollCtx, cancel := context.WithDeadline(ctx, deadline)
+       defer cancel()
+
+       alive := frozen
+       var evictedLaggards []*schemav1.NodeLaggard
+       interval := barrierInitInterval
+       var lastResults []stillPresentResult
+       for {
+               view := b.currentMembership()
+               var newlyEvicted []*schemav1.NodeLaggard
+               alive, newlyEvicted = applyTransitionsApplied(alive, view)
+               evictedLaggards = append(evictedLaggards, newlyEvicted...)
+
+               if len(alive) == 0 {
+                       return &schemav1.AwaitSchemaDeletedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
+               }
+
+               lastResults = b.probeAbsentKeys(pollCtx, alive, req.GetKeys(), 
deadline)
+               if allKeysAbsent(lastResults) {
+                       return &schemav1.AwaitSchemaDeletedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
+               }
+               if time.Now().After(deadline) {
+                       return &schemav1.AwaitSchemaDeletedResponse{
+                               Applied:  false,
+                               Laggards: 
append(stillPresentLaggards(lastResults), evictedLaggards...),
+                       }, nil
+               }
+               select {
+               case <-time.After(interval):
+               case <-pollCtx.Done():
+                       return &schemav1.AwaitSchemaDeletedResponse{
+                               Applied:  false,
+                               Laggards: 
append(stillPresentLaggards(lastResults), evictedLaggards...),
+                       }, nil
+               }
+               interval = barrierBackoff(interval)
+       }
+}
+
+// probeAbsentKeys runs one parallel iteration of GetAbsentKeys probes
+// against the watched set, chunking keys at barrierKeyChunkSize with a
+// shared call-wide deadline.
+func (b *barrierService) probeAbsentKeys(ctx context.Context, members 
[]member, keys []*schemav1.SchemaKey, deadline time.Time) []stillPresentResult {
+       results := make([]stillPresentResult, len(members))
+       var wg sync.WaitGroup
+       probeCtx, cancel := context.WithDeadline(ctx, deadline)
+       defer cancel()
+       for i := range members {
+               wg.Add(1)
+               go func(idx int) {
+                       defer wg.Done()
+                       results[idx] = b.probeAbsentOne(probeCtx, members[idx], 
keys)
+               }(i)
+       }
+       wg.Wait()
+       return results
+}
+
+// probeAbsentOne returns the per-member outcome for one iteration of the
+// AwaitSchemaDeleted probe.
+func (b *barrierService) probeAbsentOne(ctx context.Context, m member, keys 
[]*schemav1.SchemaKey) stillPresentResult {
+       if m.isSelf {
+               c := b.cache()
+               if c == nil {
+                       // Fail-closed: nil cache → claim every key still 
present so
+                       // the loop keeps polling until the cache is online. 
Mirrors
+                       // Phase 1's collectPresentKeys nil-cache contract.
+                       present := make([]*schemav1.SchemaKey, len(keys))
+                       copy(present, keys)
+                       return stillPresentResult{member: m, stillPresent: 
present}
+               }
+               var present []*schemav1.SchemaKey
+               for _, key := range keys {
+                       propID := schemaKeyToPropID(key)
+                       if _, ok := c.GetKeyModRevision(propID); ok {
+                               present = append(present, key)
+                       }
+               }
+               return stillPresentResult{member: m, stillPresent: present, 
ready: len(present) == 0}
+       }
+
+       tier := b.peerLiaisons
+       if m.role == roleData {
+               tier = b.dataNodes
+       }
+       if tier == nil {
+               return stillPresentResult{member: m, err: errors.New("no tier 
client wired")}
+       }
+       client := tier()
+       if client == nil {
+               return stillPresentResult{member: m, err: errors.New("tier 
client unavailable")}
+       }
+       statusClient, err := client.NewNodeSchemaStatusClient(m.name)
+       if err != nil {
+               return stillPresentResult{member: m, err: err}
+       }
+
+       var present []*schemav1.SchemaKey
+       for chunkStart := 0; chunkStart < len(keys); chunkStart += 
barrierKeyChunkSize {
+               chunkEnd := chunkStart + barrierKeyChunkSize
+               if chunkEnd > len(keys) {
+                       chunkEnd = len(keys)
+               }
+               chunkKeys := keys[chunkStart:chunkEnd]
+               resp, rpcErr := statusClient.GetAbsentKeys(ctx, 
&clusterv1.GetAbsentKeysRequest{Keys: chunkKeys})
+               if rpcErr != nil {
+                       if status.Code(rpcErr) == codes.Unimplemented {
+                               return stillPresentResult{member: m, ready: 
true}
+                       }
+                       return stillPresentResult{member: m, err: rpcErr}
+               }
+               present = append(present, resp.GetStillPresentKeys()...)
+       }
+       return stillPresentResult{member: m, stillPresent: present, ready: 
len(present) == 0}
+}
+
+// allKeysAbsent reports whether every member's most recent probe found
+// every requested key absent (i.e. removed from its cache).
+func allKeysAbsent(results []stillPresentResult) bool {
+       for _, r := range results {
+               if !r.ready {
+                       return false
+               }
+       }
+       return true
+}
+
+// stillPresentLaggards builds the laggards list for the timeout response,
+// each entry carrying the per-node still_present_keys observed in the
+// final iteration.
+func stillPresentLaggards(results []stillPresentResult) 
[]*schemav1.NodeLaggard {
+       laggards := make([]*schemav1.NodeLaggard, 0)
+       for _, r := range results {
+               if r.ready {
+                       continue
+               }
+               laggards = append(laggards, &schemav1.NodeLaggard{
+                       Node:             r.member.laggardName(),
+                       StillPresentKeys: r.stillPresent,
+               })
+       }
+       return laggards
+}
+
 // allKeysApplied reports whether every member's most recent probe found
 // every key applied at or above its target revision.
 func allKeysApplied(results []keyProbeResult) bool {
diff --git a/banyand/liaison/grpc/barrier_cluster_test.go 
b/banyand/liaison/grpc/barrier_cluster_test.go
index 2921e6156..ce98f7c79 100644
--- a/banyand/liaison/grpc/barrier_cluster_test.go
+++ b/banyand/liaison/grpc/barrier_cluster_test.go
@@ -55,6 +55,7 @@ type fakeNodeStatusClient struct {
        err          error
        callsRef     *int32
        keyState     map[string]fakeKeyState
+       keyStateFn   func() map[string]fakeKeyState
        keyRevsCalls *int32
        revs         []int64
        maxRev       int64
@@ -126,8 +127,37 @@ func keyStateKey(kind, group, name string) string {
        return kind + "|" + group + "|" + name
 }
 
-func (*fakeNodeStatusClient) GetAbsentKeys(_ context.Context, _ 
*clusterv1.GetAbsentKeysRequest, _ ...grpc.CallOption) 
(*clusterv1.GetAbsentKeysResponse, error) {
-       panic("GetAbsentKeys: unused in Phase 2.2 tests")
+func (f *fakeNodeStatusClient) GetAbsentKeys(
+       ctx context.Context, req *clusterv1.GetAbsentKeysRequest, _ 
...grpc.CallOption,
+) (*clusterv1.GetAbsentKeysResponse, error) {
+       if f.keyRevsCalls != nil {
+               atomic.AddInt32(f.keyRevsCalls, 1)
+       }
+       if f.keyRevsDelay > 0 {
+               select {
+               case <-time.After(f.keyRevsDelay):
+               case <-ctx.Done():
+                       return nil, ctx.Err()
+               }
+       }
+       if f.err != nil {
+               return nil, f.err
+       }
+       state := f.keyState
+       if f.keyStateFn != nil {
+               state = f.keyStateFn()
+       }
+       keys := req.GetKeys()
+       var absent, present []*schemav1.SchemaKey
+       for _, k := range keys {
+               mapKey := k.GetKind() + "|" + k.GetGroup() + "|" + k.GetName()
+               if s, ok := state[mapKey]; ok && s.present {
+                       present = append(present, k)
+                       continue
+               }
+               absent = append(absent, k)
+       }
+       return &clusterv1.GetAbsentKeysResponse{AbsentKeys: absent, 
StillPresentKeys: present}, nil
 }
 
 // fakeQueueClient embeds queue.Client (a nil interface) so unused methods
@@ -687,3 +717,115 @@ func 
TestAwaitSchemaApplied_FanOut_PeerUnimplemented_TreatedAsReady(t *testing.T
                "Unimplemented from a Phase-1 peer must not block 
AwaitSchemaApplied")
        assert.Empty(t, resp.GetLaggards())
 }
+
+// TestAwaitSchemaDeleted_FanOut_ReportsStillPresent verifies that the
+// timeout response carries per-node still_present_keys with role-prefixed
+// identifiers — exact mirror of the AwaitSchemaApplied missing-keys layout
+// for the deletion barrier.
+func TestAwaitSchemaDeleted_FanOut_ReportsStillPresent(t *testing.T) {
+       keys := []*schemav1.SchemaKey{
+               streamKey("g", "k0"),
+               streamKey("g", "k1"),
+               streamKey("g", "k2"),
+       }
+       cache := &staticBarrierCache{maxModRevision: 100} // self has nothing → 
all absent
+       peerA := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{
+               keyStateKey("stream", "g", "k0"): presentAt(100),
+               keyStateKey("stream", "g", "k1"): presentAt(100),
+       }} // still_present: k0, k1
+       peerB := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{
+               keyStateKey("stream", "g", "k2"): presentAt(100),
+       }} // still_present: k2
+       tier1 := newFakeTier([]string{"peer-A", "peer-B"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "peer-A": peerA,
+               "peer-B": peerB,
+       })
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitSchemaDeleted(context.Background(), 
&schemav1.AwaitSchemaDeletedRequest{
+               Keys:    keys,
+               Timeout: durationpb.New(60 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+       require.Len(t, resp.GetLaggards(), 2)
+
+       la := laggardByNode(resp.GetLaggards(), "liaison-peer-A")
+       require.NotNil(t, la, "laggard for peer-A must be present")
+       assert.Equal(t, []string{"k0", "k1"}, 
schemaKeyNames(la.GetStillPresentKeys()))
+
+       lb := laggardByNode(resp.GetLaggards(), "liaison-peer-B")
+       require.NotNil(t, lb, "laggard for peer-B must be present")
+       assert.Equal(t, []string{"k2"}, 
schemaKeyNames(lb.GetStillPresentKeys()))
+}
+
+// TestAwaitSchemaDeleted_FanOut_SucceedsAfterAllNodesDrain verifies that
+// once every member reports the keys absent, applied=true is returned. The
+// peer's keyState is mutated on the second probe call to simulate a drain
+// completing mid-call (real production: the watch event finally landed).
+func TestAwaitSchemaDeleted_FanOut_SucceedsAfterAllNodesDrain(t *testing.T) {
+       keys := []*schemav1.SchemaKey{streamKey("g", "k0"), streamKey("g", 
"k1")}
+       cache := &staticBarrierCache{maxModRevision: 100} // self drained from 
start
+       var calls int32
+       peer := &fakeNodeStatusClient{
+               keyRevsCalls: &calls,
+               keyStateFn: func() map[string]fakeKeyState {
+                       // Call 1: both keys still present; call 2+: both 
drained.
+                       if atomic.LoadInt32(&calls) <= 1 {
+                               return map[string]fakeKeyState{
+                                       keyStateKey("stream", "g", "k0"): 
presentAt(100),
+                                       keyStateKey("stream", "g", "k1"): 
presentAt(100),
+                               }
+                       }
+                       return map[string]fakeKeyState{}
+               },
+       }
+       tier1 := newFakeTier([]string{"peer"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer})
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitSchemaDeleted(context.Background(), 
&schemav1.AwaitSchemaDeletedRequest{
+               Keys:    keys,
+               Timeout: durationpb.New(200 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(),
+               "barrier must converge once peer reports both keys absent on 
the second probe")
+       assert.Empty(t, resp.GetLaggards())
+       assert.GreaterOrEqual(t, atomic.LoadInt32(&calls), int32(2),
+               "the loop must have probed at least twice for the drain to be 
observed")
+}
+
+// TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey verifies
+// that after a delete + recreate at a higher revision, the absence probe
+// recognizes the new live row on every member and returns applied=false
+// with that key in still_present_keys (the new instance is alive — the
+// caller's deletion request is no longer accurate for it).
+//
+// Setup: key "k0" was deleted then recreated at rev=200. peer's cache
+// reports it Present with rev=200. Self's cache also reports it present.
+// The barrier must therefore time out and surface the still-present key.
+func TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey(t 
*testing.T) {
+       keys := []*schemav1.SchemaKey{streamKey("g", "k0")}
+       cache := &staticBarrierCache{
+               maxModRevision: 200,
+               keys:           map[string]int64{"stream_g/k0": 200}, // 
recreated at higher rev
+       }
+       peer := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{
+               keyStateKey("stream", "g", "k0"): presentAt(200),
+       }}
+       tier1 := newFakeTier([]string{"peer"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer})
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitSchemaDeleted(context.Background(), 
&schemav1.AwaitSchemaDeletedRequest{
+               Keys:    keys,
+               Timeout: durationpb.New(40 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied(),
+               "a recreated-at-higher-rev key is alive on every member; 
deletion barrier must NOT report applied=true")
+       require.NotEmpty(t, resp.GetLaggards())
+       for _, l := range resp.GetLaggards() {
+               assert.Equal(t, []string{"k0"}, 
schemaKeyNames(l.GetStillPresentKeys()),
+                       "every member's laggard entry must list k0 as still 
present")
+       }
+}

Reply via email to