This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e9e1afcc9d43aa42d0f69080c0ed8faf8e6d46fa
Author: Hongtao Gao <[email protected]>
AuthorDate: Mon May 4 06:28:06 2026 +0000

    feat(barrier): cluster fan-out for AwaitSchemaApplied (Phase 2 §FA-1, §FA-2)
    
    Phase 2 Step 2.3 — extend the cluster barrier from #1111 (Step 2.2) to the
    per-key path. AwaitSchemaApplied now probes a frozen tier1 + tier2 + self
    watched set in parallel via GetKeyRevisions; the receiving liaison's own
    cache is read in-process. Snapshot semantics — eviction / leave / late-join 
—
    match the AwaitRevisionApplied behavior added in §SS-1..SS-4.
    
    Algorithm:
    
    - Per-iteration loop: re-snapshot tier1 + tier2 route tables, apply
      membership transitions (applyTransitionsApplied), probe alive set,
      decide. Eviction laggards for the per-key path carry only Node + Reason
      ("evicted_during_poll"); the cluster has already removed the member, so
      its outstanding key state is no longer something the call is waiting on.
    - Per-member probe: chunk keys at barrierKeyChunkSize=1000 per RPC; each
      per-chunk context inherits time.Until(deadline) (shared, NOT divided
      across chunks or members). Total wall-clock stays bounded by req.Timeout
      regardless of fan-out width or chunk count.
    - Self path: in-process scan via barrierCacheReader, mirroring Phase 1's
      collectMissingKeys logic.
    - Cross-version policy: a peer returning codes.Unimplemented from
      GetKeyRevisions (Phase-1 v0.11/v0.12 node) is treated as ready (assume
      every key applied) so partial-upgrade clusters do not deadlock barrier
      callers. Other gRPC errors → transient laggard for this iteration only.
    - Backoff cadence is unchanged: 10ms init × 1.5 cap 500ms; default 5s
      timeout.
    
    Dispatch: AwaitSchemaApplied checks peerLiaisons / dataNodes / selfName at
    the top of the call and routes to awaitSchemaAppliedCluster when all three
    are wired (production); falls back to the legacy in-process loop otherwise
    (Phase 1 unit-test path).
    
    Tests (barrier_cluster_test.go):
    
    - fakeNodeStatusClient gains a real GetKeyRevisions impl backed by a
      per-key state map (fakeKeyState{rev, present}); per-call counters and
      delays let tests verify chunking + shared-deadline behavior.
    - TestAwaitSchemaApplied_FanOut_PerKeyLaggards: timeout response carries
      per-node missing_keys with role-prefixed identifiers.
    - TestAwaitSchemaApplied_FanOut_ChunkedKeys: a 1500-key request triggers
      ≥ 2 GetKeyRevisions calls per peer (1000 + 500) per iteration; the
      per-peer aggregated missing_keys covers all 1500 across chunks.
    - TestAwaitSchemaApplied_FanOut_SharedDeadline: regression — with each
      chunk delay 100ms and req.Timeout 50ms, total wall-clock stays under
      180ms (NOT 2 × 100ms = 200ms, which is what equal-slice timeout
      division would produce).
    - TestAwaitSchemaApplied_FanOut_PeerUnimplemented_TreatedAsReady:
      Unimplemented from a Phase-1 peer must not block AwaitSchemaApplied.
    
    All 19 cluster fan-out tests pass; existing barrier_test.go tests (Phase 1
    single-cache path + collectMissingKeys / collectPresentKeys helpers) stay
    green; lint clean.
    
    via [HAPI](https://hapi.run)
---
 banyand/liaison/grpc/barrier.go              |   8 +-
 banyand/liaison/grpc/barrier_cluster.go      | 228 +++++++++++++++++++++++
 banyand/liaison/grpc/barrier_cluster_test.go | 260 +++++++++++++++++++++++++--
 3 files changed, 483 insertions(+), 13 deletions(-)

diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go
index 098301935..ec152b8ec 100644
--- a/banyand/liaison/grpc/barrier.go
+++ b/banyand/liaison/grpc/barrier.go
@@ -138,8 +138,14 @@ func (b *barrierService) AwaitRevisionApplied(ctx 
context.Context, req *schemav1
 }
 
 // AwaitSchemaApplied blocks until all requested keys are present at or above 
their
-// per-key min_revisions, or the timeout elapses.
+// per-key min_revisions, or the timeout elapses. When the cluster fan-out
+// dependencies are wired (production), the call probes the frozen tier1 +
+// tier2 + self watched set in parallel via GetKeyRevisions; without them
+// (Phase 1 unit-test path), it falls back to a single in-process cache poll.
 func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req 
*schemav1.AwaitSchemaAppliedRequest) (*schemav1.AwaitSchemaAppliedResponse, 
error) {
+       if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil {
+               return b.awaitSchemaAppliedCluster(ctx, req)
+       }
        if len(req.GetKeys()) > barrierMaxKeys {
                return nil, status.Errorf(codes.InvalidArgument, "too many 
keys: max=%d", barrierMaxKeys)
        }
diff --git a/banyand/liaison/grpc/barrier_cluster.go 
b/banyand/liaison/grpc/barrier_cluster.go
index 3cbbae78e..8b14410d1 100644
--- a/banyand/liaison/grpc/barrier_cluster.go
+++ b/banyand/liaison/grpc/barrier_cluster.go
@@ -329,6 +329,234 @@ func (b *barrierService) probeOne(ctx context.Context, m 
member, minRev int64) p
        return probeResult{member: m, rev: rev, ready: rev >= minRev}
 }
 
+// keyProbeResult is the per-iteration outcome of a GetKeyRevisions probe for
+// one member. missingKeys carries only the keys that failed the apply check
+// (absent or below their target revision). ready=true when missingKeys is
+// empty after the probe; err!=nil means the probe is a transient laggard for
+// this iteration only.
+type keyProbeResult struct {
+       err         error
+       missingKeys []*schemav1.SchemaKey
+       member      member
+       ready       bool
+}
+
+// barrierKeyChunkSize matches the plan §2.3 chunking policy: each peer probe
+// for AwaitSchemaApplied / AwaitSchemaDeleted issues at most 1000 keys per
+// RPC. The server-side cap (nodeStatusMaxKeys = 10000) is the absolute limit;
+// 1000 is a soft chunking threshold that keeps individual RPC payloads well
+// under typical gRPC max-message-size defaults and fans the work out across
+// multiple round-trips for large requests.
+const barrierKeyChunkSize = 1000
+
+// awaitSchemaAppliedCluster runs the cluster-wide fan-out for
+// AwaitSchemaApplied. Self is probed in-process via the cache; peers are
+// probed via GetKeyRevisions over the *grpc.ClientConn borrowed from
+// queue.Client. Per-member probes chunk keys at barrierKeyChunkSize per RPC,
+// each chunk inheriting the call-wide deadline (shared, not divided across
+// chunks or members). Frozen-snapshot semantics — eviction / leave / late
+// joiner exclusion — match awaitRevisionAppliedCluster's behavior.
+func (b *barrierService) awaitSchemaAppliedCluster(ctx context.Context, req 
*schemav1.AwaitSchemaAppliedRequest) (*schemav1.AwaitSchemaAppliedResponse, 
error) {
+       if len(req.GetKeys()) > barrierMaxKeys {
+               return nil, status.Errorf(codes.InvalidArgument, "too many 
keys: max=%d", barrierMaxKeys)
+       }
+       frozen := b.snapshotMembers()
+       if len(frozen) == 0 {
+               return nil, status.Errorf(codes.Unavailable, "no active cluster 
members")
+       }
+
+       deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout()))
+       pollCtx, cancel := context.WithDeadline(ctx, deadline)
+       defer cancel()
+
+       alive := frozen
+       var evictedLaggards []*schemav1.NodeLaggard
+       interval := barrierInitInterval
+       var lastResults []keyProbeResult
+       for {
+               view := b.currentMembership()
+               var newlyEvicted []*schemav1.NodeLaggard
+               alive, newlyEvicted = applyTransitionsApplied(alive, view)
+               evictedLaggards = append(evictedLaggards, newlyEvicted...)
+
+               if len(alive) == 0 {
+                       return &schemav1.AwaitSchemaAppliedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
+               }
+
+               lastResults = b.probeKeyRevisions(pollCtx, alive, 
req.GetKeys(), req.GetMinRevisions(), deadline)
+               if allKeysApplied(lastResults) {
+                       return &schemav1.AwaitSchemaAppliedResponse{Applied: 
true, Laggards: evictedLaggards}, nil
+               }
+               if time.Now().After(deadline) {
+                       return &schemav1.AwaitSchemaAppliedResponse{
+                               Applied:  false,
+                               Laggards: append(keyLaggards(lastResults), 
evictedLaggards...),
+                       }, nil
+               }
+               select {
+               case <-time.After(interval):
+               case <-pollCtx.Done():
+                       return &schemav1.AwaitSchemaAppliedResponse{
+                               Applied:  false,
+                               Laggards: append(keyLaggards(lastResults), 
evictedLaggards...),
+                       }, nil
+               }
+               interval = barrierBackoff(interval)
+       }
+}
+
+// applyTransitionsApplied is the AwaitSchemaApplied / AwaitSchemaDeleted
+// sibling of applyTransitions. Eviction laggards for the per-key barriers
+// carry only the role-prefixed name and Reason; missing_keys /
+// still_present_keys are intentionally empty because the cluster has
+// already removed the member from the watched set, so its outstanding key
+// state is no longer something the call is waiting on.
+func applyTransitionsApplied(alive []member, view membershipView) (kept 
[]member, newlyEvicted []*schemav1.NodeLaggard) {
+       for _, m := range alive {
+               if m.isSelf {
+                       kept = append(kept, m)
+                       continue
+               }
+               if _, evicted := view.evictable[m.name]; evicted {
+                       newlyEvicted = append(newlyEvicted, 
&schemav1.NodeLaggard{
+                               Node:   m.laggardName(),
+                               Reason: evictedLaggardReason,
+                       })
+                       continue
+               }
+               if _, active := view.active[m.name]; !active {
+                       continue
+               }
+               kept = append(kept, m)
+       }
+       return kept, newlyEvicted
+}
+
+// probeKeyRevisions runs one parallel iteration of GetKeyRevisions probes
+// against the watched set. Each per-member probe chunks the key list into
+// blocks of barrierKeyChunkSize and aggregates per-chunk responses into a
+// single missingKeys slice; the call-wide deadline is shared across all
+// chunks of all members.
+func (b *barrierService) probeKeyRevisions(ctx context.Context, members 
[]member, keys []*schemav1.SchemaKey, minRevs []int64, deadline time.Time) 
[]keyProbeResult {
+       results := make([]keyProbeResult, len(members))
+       var wg sync.WaitGroup
+       probeCtx, cancel := context.WithDeadline(ctx, deadline)
+       defer cancel()
+       for i := range members {
+               wg.Add(1)
+               go func(idx int) {
+                       defer wg.Done()
+                       results[idx] = b.probeKeysOne(probeCtx, members[idx], 
keys, minRevs)
+               }(i)
+       }
+       wg.Wait()
+       return results
+}
+
+// probeKeysOne returns the per-member outcome for one iteration of
+// AwaitSchemaApplied's per-key probe. Self uses an in-process cache scan;
+// peers use GetKeyRevisions chunked at barrierKeyChunkSize.
+func (b *barrierService) probeKeysOne(ctx context.Context, m member, keys 
[]*schemav1.SchemaKey, minRevs []int64) keyProbeResult {
+       if m.isSelf {
+               c := b.cache()
+               if c == nil {
+                       // Fail-closed: nil cache → every key is missing so the 
loop
+                       // keeps polling until the cache is online.
+                       missing := make([]*schemav1.SchemaKey, len(keys))
+                       copy(missing, keys)
+                       return keyProbeResult{member: m, missingKeys: missing}
+               }
+               var missing []*schemav1.SchemaKey
+               for idx, key := range keys {
+                       propID := schemaKeyToPropID(key)
+                       rev, ok := c.GetKeyModRevision(propID)
+                       var minRev int64
+                       if idx < len(minRevs) {
+                               minRev = minRevs[idx]
+                       }
+                       if !ok || (minRev > 0 && rev < minRev) {
+                               missing = append(missing, key)
+                       }
+               }
+               return keyProbeResult{member: m, missingKeys: missing, ready: 
len(missing) == 0}
+       }
+
+       tier := b.peerLiaisons
+       if m.role == roleData {
+               tier = b.dataNodes
+       }
+       if tier == nil {
+               return keyProbeResult{member: m, err: errors.New("no tier 
client wired")}
+       }
+       client := tier()
+       if client == nil {
+               return keyProbeResult{member: m, err: errors.New("tier client 
unavailable")}
+       }
+       statusClient, err := client.NewNodeSchemaStatusClient(m.name)
+       if err != nil {
+               return keyProbeResult{member: m, err: err}
+       }
+
+       var missing []*schemav1.SchemaKey
+       for chunkStart := 0; chunkStart < len(keys); chunkStart += 
barrierKeyChunkSize {
+               chunkEnd := chunkStart + barrierKeyChunkSize
+               if chunkEnd > len(keys) {
+                       chunkEnd = len(keys)
+               }
+               chunkKeys := keys[chunkStart:chunkEnd]
+               resp, rpcErr := statusClient.GetKeyRevisions(ctx, 
&clusterv1.GetKeyRevisionsRequest{Keys: chunkKeys})
+               if rpcErr != nil {
+                       // Cross-version: a Phase-1 peer answers Unimplemented; 
treat
+                       // that member as ready (assume every key applied).
+                       if status.Code(rpcErr) == codes.Unimplemented {
+                               return keyProbeResult{member: m, ready: true}
+                       }
+                       // Other RPC errors are transient laggards for this 
iteration.
+                       return keyProbeResult{member: m, err: rpcErr}
+               }
+               revisions := resp.GetRevisions()
+               for i, kr := range revisions {
+                       globalIdx := chunkStart + i
+                       var minRev int64
+                       if globalIdx < len(minRevs) {
+                               minRev = minRevs[globalIdx]
+                       }
+                       if !kr.GetPresent() || (minRev > 0 && 
kr.GetModRevision() < minRev) {
+                               missing = append(missing, chunkKeys[i])
+                       }
+               }
+       }
+       return keyProbeResult{member: m, missingKeys: missing, ready: 
len(missing) == 0}
+}
+
+// allKeysApplied reports whether every member's most recent probe found
+// every key applied at or above its target revision.
+func allKeysApplied(results []keyProbeResult) bool {
+       for _, r := range results {
+               if !r.ready {
+                       return false
+               }
+       }
+       return true
+}
+
+// keyLaggards builds the laggards list for the timeout response. Members
+// that succeeded this iteration are excluded so the list contains only
+// the actual stragglers (each carrying their per-node missing_keys).
+func keyLaggards(results []keyProbeResult) []*schemav1.NodeLaggard {
+       laggards := make([]*schemav1.NodeLaggard, 0)
+       for _, r := range results {
+               if r.ready {
+                       continue
+               }
+               laggards = append(laggards, &schemav1.NodeLaggard{
+                       Node:        r.member.laggardName(),
+                       MissingKeys: r.missingKeys,
+               })
+       }
+       return laggards
+}
+
 // allReady reports whether every member's most recent probe returned ready.
 func allReady(results []probeResult) bool {
        for _, r := range results {
diff --git a/banyand/liaison/grpc/barrier_cluster_test.go 
b/banyand/liaison/grpc/barrier_cluster_test.go
index adcad0921..2921e6156 100644
--- a/banyand/liaison/grpc/barrier_cluster_test.go
+++ b/banyand/liaison/grpc/barrier_cluster_test.go
@@ -21,6 +21,7 @@ import (
        "context"
        "errors"
        "sort"
+       "strconv"
        "sync/atomic"
        "testing"
        "time"
@@ -38,18 +39,27 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/queue"
 )
 
+// fakeKeyState backs the GetKeyRevisions fake response for a single key.
+// Keys absent from the keyState map default to Present=false (the natural
+// "node hasn't seen this key" semantic).
+type fakeKeyState struct {
+       rev     int64
+       present bool
+}
+
 // fakeNodeStatusClient is a minimal clusterv1.NodeSchemaStatusServiceClient.
-// Only GetMaxRevision is implemented for Phase 2.2 tests; the other RPCs
-// land in 2.3/2.4 and panic if a test accidentally invokes them. revs lets
-// frozen-snapshot tests vary the returned revision per call (call k returns
-// revs[min(k, len-1)]) so the barrier can observe a member catching up
-// across iterations.
+// GetMaxRevision and GetKeyRevisions are implemented; GetAbsentKeys remains
+// a panic stub until Phase 2.4 (FD-1/FD-2). Per-call counters and delays let
+// tests verify chunking + shared-deadline regression behavior.
 type fakeNodeStatusClient struct {
-       err      error
-       callsRef *int32
-       revs     []int64
-       maxRev   int64
-       delay    time.Duration
+       err          error
+       callsRef     *int32
+       keyState     map[string]fakeKeyState
+       keyRevsCalls *int32
+       revs         []int64
+       maxRev       int64
+       delay        time.Duration
+       keyRevsDelay time.Duration
 }
 
 func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _ 
*clusterv1.GetMaxRevisionRequest, _ ...grpc.CallOption) 
(*clusterv1.GetMaxRevisionResponse, error) {
@@ -77,8 +87,43 @@ func (f *fakeNodeStatusClient) GetMaxRevision(ctx 
context.Context, _ *clusterv1.
        return &clusterv1.GetMaxRevisionResponse{MaxModRevision: f.maxRev}, nil
 }
 
-func (*fakeNodeStatusClient) GetKeyRevisions(_ context.Context, _ 
*clusterv1.GetKeyRevisionsRequest, _ ...grpc.CallOption) 
(*clusterv1.GetKeyRevisionsResponse, error) {
-       panic("GetKeyRevisions: unused in Phase 2.2 tests")
+func (f *fakeNodeStatusClient) GetKeyRevisions(
+       ctx context.Context, req *clusterv1.GetKeyRevisionsRequest, _ 
...grpc.CallOption,
+) (*clusterv1.GetKeyRevisionsResponse, error) {
+       if f.keyRevsCalls != nil {
+               atomic.AddInt32(f.keyRevsCalls, 1)
+       }
+       if f.keyRevsDelay > 0 {
+               select {
+               case <-time.After(f.keyRevsDelay):
+               case <-ctx.Done():
+                       return nil, ctx.Err()
+               }
+       }
+       if f.err != nil {
+               return nil, f.err
+       }
+       keys := req.GetKeys()
+       revs := make([]*clusterv1.KeyRevision, len(keys))
+       for i, k := range keys {
+               mapKey := k.GetKind() + "|" + k.GetGroup() + "|" + k.GetName()
+               if state, ok := f.keyState[mapKey]; ok {
+                       revs[i] = &clusterv1.KeyRevision{Key: k, Present: 
state.present, ModRevision: state.rev}
+                       continue
+               }
+               revs[i] = &clusterv1.KeyRevision{Key: k}
+       }
+       return &clusterv1.GetKeyRevisionsResponse{Revisions: revs}, nil
+}
+
+// keyStateKey produces the deterministic map key used by fakeNodeStatusClient
+// to look up per-(kind,group,name) state. Tests currently exercise only the
+// "stream" kind; extend the signature when other kinds need direct state
+// injection.
+//
+//nolint:unparam // kind/group/name kept symmetric with SchemaKey for clarity.
+func keyStateKey(kind, group, name string) string {
+       return kind + "|" + group + "|" + name
 }
 
 func (*fakeNodeStatusClient) GetAbsentKeys(_ context.Context, _ 
*clusterv1.GetAbsentKeysRequest, _ ...grpc.CallOption) 
(*clusterv1.GetAbsentKeysResponse, error) {
@@ -451,3 +496,194 @@ func TestFanOut_LateJoiner_Excluded(t *testing.T) {
        assert.Empty(t, resp.GetLaggards(),
                "laggards list must not contain the late joiner peer-B")
 }
+
+// presentAt builds a fakeKeyState shorthand for "key is present on this
+// member at the given revision."
+//
+//nolint:unparam // tests currently target rev=100 only; the parameter is kept 
for clarity.
+func presentAt(rev int64) fakeKeyState { return fakeKeyState{rev: rev, 
present: true} }
+
+// streamKey is a SchemaKey constructor for the cluster fan-out tests.
+//
+//nolint:unparam // tests currently use a single group "g"; group remains in 
the signature for readability.
+func streamKey(group, name string) *schemav1.SchemaKey {
+       return &schemav1.SchemaKey{Kind: "stream", Group: group, Name: name}
+}
+
+// laggardByNode returns the NodeLaggard whose Node field matches `name`, or
+// nil if not found. Used to assert per-node laggard payloads without
+// depending on slice ordering.
+func laggardByNode(laggards []*schemav1.NodeLaggard, name string) 
*schemav1.NodeLaggard {
+       for _, l := range laggards {
+               if l.GetNode() == name {
+                       return l
+               }
+       }
+       return nil
+}
+
+// TestAwaitSchemaApplied_FanOut_PerKeyLaggards verifies that the timeout
+// response carries per-node missing_keys with role-prefixed identifiers, so
+// the caller can see exactly which keys are outstanding on which member.
+func TestAwaitSchemaApplied_FanOut_PerKeyLaggards(t *testing.T) {
+       keys := []*schemav1.SchemaKey{
+               streamKey("g", "k0"),
+               streamKey("g", "k1"),
+               streamKey("g", "k2"),
+               streamKey("g", "k3"),
+               streamKey("g", "k4"),
+       }
+       cache := &staticBarrierCache{
+               maxModRevision: 100,
+               keys: map[string]int64{
+                       "stream_g/k0": 100, "stream_g/k1": 100,
+                       "stream_g/k2": 100, "stream_g/k3": 100, "stream_g/k4": 
100,
+               },
+       }
+       peerA := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{
+               keyStateKey("stream", "g", "k0"): presentAt(100),
+               keyStateKey("stream", "g", "k1"): presentAt(100),
+       }} // missing: k2, k3, k4
+       peerB := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{
+               keyStateKey("stream", "g", "k0"): presentAt(100),
+               keyStateKey("stream", "g", "k1"): presentAt(100),
+               keyStateKey("stream", "g", "k2"): presentAt(100),
+               keyStateKey("stream", "g", "k3"): presentAt(100),
+       }} // missing: k4
+       tier1 := newFakeTier([]string{"peer-A", "peer-B"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "peer-A": peerA,
+               "peer-B": peerB,
+       })
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitSchemaApplied(context.Background(), 
&schemav1.AwaitSchemaAppliedRequest{
+               Keys:    keys,
+               Timeout: durationpb.New(60 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+       require.Len(t, resp.GetLaggards(), 2)
+
+       la := laggardByNode(resp.GetLaggards(), "liaison-peer-A")
+       require.NotNil(t, la, "laggard for peer-A must be present")
+       assert.Equal(t, []string{"k2", "k3", "k4"}, 
schemaKeyNames(la.GetMissingKeys()))
+
+       lb := laggardByNode(resp.GetLaggards(), "liaison-peer-B")
+       require.NotNil(t, lb, "laggard for peer-B must be present")
+       assert.Equal(t, []string{"k4"}, schemaKeyNames(lb.GetMissingKeys()))
+}
+
+// schemaKeyNames extracts the Name field from each SchemaKey for assertion
+// readability.
+func schemaKeyNames(keys []*schemav1.SchemaKey) []string {
+       out := make([]string, len(keys))
+       for i, k := range keys {
+               out[i] = k.GetName()
+       }
+       sort.Strings(out)
+       return out
+}
+
+// TestAwaitSchemaApplied_FanOut_ChunkedKeys verifies that a request whose
+// key count exceeds barrierKeyChunkSize (1000) triggers multiple per-peer
+// RPCs, with the per-chunk responses correctly aggregated into a single
+// missing_keys slice for that peer.
+func TestAwaitSchemaApplied_FanOut_ChunkedKeys(t *testing.T) {
+       const total = 1500
+       keys := make([]*schemav1.SchemaKey, total)
+       cacheKeys := make(map[string]int64, total)
+       for i := range total {
+               name := "k" + strconv.Itoa(i)
+               keys[i] = streamKey("g", name)
+               cacheKeys["stream_g/"+name] = 100 // self has them all
+       }
+       cache := &staticBarrierCache{maxModRevision: 100, keys: cacheKeys}
+
+       var peerCalls int32
+       peer := &fakeNodeStatusClient{
+               keyState:     map[string]fakeKeyState{}, // peer has nothing → 
every key missing
+               keyRevsCalls: &peerCalls,
+       }
+       tier1 := newFakeTier([]string{"peer"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer})
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitSchemaApplied(context.Background(), 
&schemav1.AwaitSchemaAppliedRequest{
+               Keys:    keys,
+               Timeout: durationpb.New(40 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+
+       // Expect 2 chunks (1000 + 500) per iteration. The loop may run multiple
+       // iterations within the timeout; assert at least one full pass 
happened.
+       assert.GreaterOrEqual(t, atomic.LoadInt32(&peerCalls), int32(2),
+               "a 1500-key request must produce >= 2 GetKeyRevisions calls per 
peer (chunked at 1000)")
+       assert.Equal(t, int32(0), atomic.LoadInt32(&peerCalls)%2,
+               "chunks per iteration is 2 (1000 + 500); call count should be a 
multiple of 2")
+
+       // All 1500 keys missing from peer; aggregated across both chunks.
+       la := laggardByNode(resp.GetLaggards(), "liaison-peer")
+       require.NotNil(t, la)
+       assert.Len(t, la.GetMissingKeys(), total,
+               "per-peer missing_keys aggregates both chunks")
+}
+
+// TestAwaitSchemaApplied_FanOut_SharedDeadline regresses the plan §2.3
+// contract that each per-node, per-chunk RPC inherits time.Until(deadline)
+// rather than req.Timeout / N. With per-chunk delays larger than the call's
+// total budget, the call must complete around req.Timeout — NOT N × delay.
+func TestAwaitSchemaApplied_FanOut_SharedDeadline(t *testing.T) {
+       const total = 1500
+       keys := make([]*schemav1.SchemaKey, total)
+       for i := range total {
+               keys[i] = streamKey("g", "k"+strconv.Itoa(i))
+       }
+       cache := &staticBarrierCache{maxModRevision: 100} // no keys → self 
also missing
+
+       peer := &fakeNodeStatusClient{
+               keyState:     map[string]fakeKeyState{},
+               keyRevsDelay: 100 * time.Millisecond, // each chunk is slow
+       }
+       tier1 := newFakeTier([]string{"peer"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer})
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       start := time.Now()
+       resp, err := svc.AwaitSchemaApplied(context.Background(), 
&schemav1.AwaitSchemaAppliedRequest{
+               Keys:    keys,
+               Timeout: durationpb.New(50 * time.Millisecond),
+       })
+       elapsed := time.Since(start)
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+       // Without shared deadline the call would take 2 × 100ms = 200ms.
+       // With shared deadline the entire call respects req.Timeout (50ms)
+       // regardless of how many chunks would otherwise be issued.
+       assert.Less(t, elapsed, 180*time.Millisecond,
+               "shared deadline must bound total wall-clock at req.Timeout, 
not N × per-chunk delay")
+}
+
+// TestAwaitSchemaApplied_FanOut_PeerUnimplemented_TreatedAsReady locks the
+// cross-version policy: a peer (or data node) that returns
+// codes.Unimplemented from GetKeyRevisions — i.e. a Phase-1 v0.11 / v0.12
+// node — is treated as ready (assume every key applied) so partial-upgrade
+// clusters do not deadlock barrier callers.
+func TestAwaitSchemaApplied_FanOut_PeerUnimplemented_TreatedAsReady(t 
*testing.T) {
+       keys := []*schemav1.SchemaKey{streamKey("g", "k0"), streamKey("g", 
"k1")}
+       cache := &staticBarrierCache{maxModRevision: 100, keys: 
map[string]int64{
+               "stream_g/k0": 100, "stream_g/k1": 100,
+       }}
+       legacy := &fakeNodeStatusClient{err: status.Error(codes.Unimplemented, 
"phase 1 node")}
+       tier1 := newFakeTier([]string{"phase1"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "phase1": legacy,
+       })
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: 
newFakeTier(nil, nil), self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitSchemaApplied(context.Background(), 
&schemav1.AwaitSchemaAppliedRequest{
+               Keys:    keys,
+               Timeout: durationpb.New(50 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(),
+               "Unimplemented from a Phase-1 peer must not block 
AwaitSchemaApplied")
+       assert.Empty(t, resp.GetLaggards())
+}

Reply via email to