This is an automated email from the ASF dual-hosted git repository.

wu-sheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c936ce8b7 feat(barrier): cluster fan-out for AwaitRevisionApplied 
(Phase 2 Step 2.2) (#1111)
c936ce8b7 is described below

commit c936ce8b790017055727656cbfb8197c83457aaf
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed May 6 17:09:28 2026 +0800

    feat(barrier): cluster fan-out for AwaitRevisionApplied (Phase 2 Step 2.2) 
(#1111)
---
 CHANGES.md                                   |   3 +-
 banyand/liaison/grpc/barrier.go              |  41 +++-
 banyand/liaison/grpc/barrier_cluster.go      | 254 +++++++++++++++++++++
 banyand/liaison/grpc/barrier_cluster_test.go | 321 +++++++++++++++++++++++++++
 banyand/liaison/grpc/server.go               |  29 ++-
 5 files changed, 639 insertions(+), 9 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a0c3dc0a9..daad0e5da 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -33,7 +33,8 @@ Release Notes.
   - Guard `pkg/schema/cache` against out-of-order `EventDelete` events; expose 
monotonic `LatestModRevision` watermark.
 - Schema consistency (Phase 2 in progress): cluster-wide barrier groundwork. 
Internal-only; no client-facing surface impact yet.
   - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`, 
`GetAbsentKeys`) registered on every cluster member that holds a schema cache, 
so peer liaisons and data nodes can be probed identically by the upcoming 
barrier fan-out (#1108).
-  - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the 
barrier fan-out can borrow the existing tier1/tier2 connection pools instead of 
opening a parallel mesh.
+  - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the 
barrier fan-out can borrow the existing tier1/tier2 connection pools instead of 
opening a parallel mesh (#1109).
+  - `AwaitRevisionApplied` now fans out across the receiving liaison's frozen 
tier1 (peer-liaison) + tier2 (data-node) Active set, probing each member in 
parallel via `GetMaxRevision` with shared per-call deadline. Cross-version 
peers returning `codes.Unimplemented` are treated as ready so partial-upgrade 
clusters do not deadlock; transient RPC errors count as per-iteration laggards. 
Empty Active set fails fast with `codes.Unavailable`.
 
 ### Bug Fixes
 
diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go
index 8e135a2e8..098301935 100644
--- a/banyand/liaison/grpc/barrier.go
+++ b/banyand/liaison/grpc/barrier.go
@@ -27,6 +27,7 @@ import (
 
        schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
 )
 
 const (
@@ -52,12 +53,38 @@ type barrierCacheReader interface {
 type barrierService struct {
        schemav1.UnimplementedSchemaBarrierServiceServer
        cacheProvider func() barrierCacheReader
+       // peerLiaisons / dataNodes / selfName are the Phase 2.2 cluster fan-out
+       // dependencies. When all three are set, AwaitRevisionApplied probes the
+       // frozen tier1+tier2 watched set in parallel; when any are nil it falls
+       // back to the legacy in-process loop (used by Phase 1 unit tests).
+       peerLiaisons func() queue.Client
+       dataNodes    func() queue.Client
+       selfName     func() string
 }
 
 func newBarrierService(cacheProvider func() barrierCacheReader) 
*barrierService {
        return &barrierService{cacheProvider: cacheProvider}
 }
 
+// newBarrierServiceCluster wires the Phase 2.2 cluster fan-out dependencies
+// alongside the cache provider. The receiving liaison's name and the per-tier
+// queue clients are resolved via closures so they can be captured at gRPC
+// server construction time and lazily evaluated once PreRun finishes
+// populating curNode and the connection pools.
+func newBarrierServiceCluster(
+       cacheProvider func() barrierCacheReader,
+       peerLiaisons func() queue.Client,
+       dataNodes func() queue.Client,
+       selfName func() string,
+) *barrierService {
+       return &barrierService{
+               cacheProvider: cacheProvider,
+               peerLiaisons:  peerLiaisons,
+               dataNodes:     dataNodes,
+               selfName:      selfName,
+       }
+}
+
 // cache resolves the current barrier cache reader. It returns nil if the
 // underlying schema registry is not yet initialized.
 func (b *barrierService) cache() barrierCacheReader {
@@ -67,11 +94,17 @@ func (b *barrierService) cache() barrierCacheReader {
        return b.cacheProvider()
 }
 
-// AwaitRevisionApplied blocks until the cache's max modRevision is >= 
req.MinRevision
-// or the timeout elapses. In standalone mode there is one node, so Laggards 
carries a
-// single entry whose current_mod_revision reports the cache watermark — this 
lets
-// callers diagnose how far behind the standalone cache is even when 
applied=false.
+// AwaitRevisionApplied blocks until every node in the watched set reports a
+// max modRevision at or above req.MinRevision. When the cluster fan-out
+// dependencies are wired (production), the call probes the frozen tier1 +
+// tier2 + self watched set in parallel via NodeSchemaStatusService; without
+// those dependencies (Phase 1 unit-test path), it falls back to a single
+// in-process cache poll, returning a self-only laggard on timeout so callers
+// can diagnose how far behind the standalone cache is.
 func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req 
*schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, 
error) {
+       if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil {
+               return b.awaitRevisionAppliedCluster(ctx, req)
+       }
        deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout()))
        pollCtx, cancel := context.WithDeadline(ctx, deadline)
        defer cancel()
diff --git a/banyand/liaison/grpc/barrier_cluster.go 
b/banyand/liaison/grpc/barrier_cluster.go
new file mode 100644
index 000000000..8cfab2938
--- /dev/null
+++ b/banyand/liaison/grpc/barrier_cluster.go
@@ -0,0 +1,254 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "sync"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+// memberRole identifies the role under which a watched-set member was 
discovered.
+type memberRole int
+
+const (
+       roleLiaison memberRole = iota
+       roleData
+)
+
+func (r memberRole) String() string {
+       switch r {
+       case roleLiaison:
+               return "liaison"
+       case roleData:
+               return "data"
+       default:
+               return "unknown_role"
+       }
+}
+
+// member is a single cluster member in the frozen watched set built at the
+// start of an Await* call.
+type member struct {
+       name   string
+       role   memberRole
+       isSelf bool
+}
+
+// laggardName returns the addressable laggard identifier per the plan's
+// `<role>-<Metadata.Name>` convention.
+func (m member) laggardName() string {
+       return m.role.String() + "-" + m.name
+}
+
+// snapshotMembers builds the frozen watched set from the receiving liaison's
+// in-process self plus the tier1 (peer-liaison) and tier2 (data-node) Active
+// route tables. Dedup is by Metadata.Name with the first-seen tier winning,
+// so a hybrid host running both roles is probed exactly once: once via self
+// if it is the receiving liaison, otherwise via tier1.
+//
+// Standalone fallback: when both tier route tables are empty (the local
+// pipeline returns an empty RouteTable) the watched set degenerates to
+// {self}, which the probe loop handles uniformly with the multi-member case.
+func (b *barrierService) snapshotMembers() []member {
+       seen := map[string]struct{}{}
+       var watched []member
+
+       if b.selfName != nil {
+               if name := b.selfName(); name != "" {
+                       seen[name] = struct{}{}
+                       watched = append(watched, member{name: name, role: 
roleLiaison, isSelf: true})
+               }
+       }
+
+       addFromTier := func(provider func() queue.Client, role memberRole) {
+               if provider == nil {
+                       return
+               }
+               client := provider()
+               if client == nil {
+                       return
+               }
+               rt := client.GetRouteTable()
+               if rt == nil {
+                       return
+               }
+               for _, name := range rt.GetActive() {
+                       if _, dup := seen[name]; dup {
+                               continue
+                       }
+                       seen[name] = struct{}{}
+                       watched = append(watched, member{name: name, role: 
role})
+               }
+       }
+       addFromTier(b.peerLiaisons, roleLiaison)
+       addFromTier(b.dataNodes, roleData)
+
+       return watched
+}
+
+// probeResult is the outcome of a single per-iteration probe of one member.
+//
+// ready=true means the member is at or above the target revision (or returned
+// codes.Unimplemented, which the cross-version policy treats as ready). When
+// ready=false the (rev, err) pair carries whatever the probe last observed —
+// rev=0 + err!=nil indicates a transient RPC failure that the next iteration
+// retries; rev>0 + err==nil indicates the member is online but behind.
+type probeResult struct {
+       err    error
+       member member
+       rev    int64
+       ready  bool
+}
+
+// awaitRevisionAppliedCluster runs the cluster-wide fan-out for
+// AwaitRevisionApplied. The receiving liaison's own cache is probed
+// in-process; peers are probed via clusterv1.NodeSchemaStatusService over the
+// *grpc.ClientConn borrowed from queue.Client.
+func (b *barrierService) awaitRevisionAppliedCluster(ctx context.Context, req 
*schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, 
error) {
+       members := b.snapshotMembers()
+       if len(members) == 0 {
+               return nil, status.Errorf(codes.Unavailable, "no active cluster 
members")
+       }
+
+       deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout()))
+       pollCtx, cancel := context.WithDeadline(ctx, deadline)
+       defer cancel()
+
+       interval := barrierInitInterval
+       var lastResults []probeResult
+       for {
+               lastResults = b.probeMembers(pollCtx, members, 
req.GetMinRevision(), deadline)
+               if allReady(lastResults) {
+                       return &schemav1.AwaitRevisionAppliedResponse{Applied: 
true}, nil
+               }
+               if time.Now().After(deadline) {
+                       return &schemav1.AwaitRevisionAppliedResponse{
+                               Applied:  false,
+                               Laggards: revisionLaggards(lastResults),
+                       }, nil
+               }
+               select {
+               case <-time.After(interval):
+               case <-pollCtx.Done():
+                       return &schemav1.AwaitRevisionAppliedResponse{
+                               Applied:  false,
+                               Laggards: revisionLaggards(lastResults),
+                       }, nil
+               }
+               interval = barrierBackoff(interval)
+       }
+}
+
+// probeMembers runs one parallel iteration of GetMaxRevision probes against
+// the watched set. Each per-member probe context inherits the call-wide
+// deadline (shared, not divided across N members) so the loop's wall-clock is
+// bounded by req.timeout regardless of fan-out width.
+func (b *barrierService) probeMembers(ctx context.Context, members []member, 
minRev int64, deadline time.Time) []probeResult {
+       results := make([]probeResult, len(members))
+       var wg sync.WaitGroup
+       probeCtx, cancel := context.WithDeadline(ctx, deadline)
+       defer cancel()
+       for i := range members {
+               wg.Add(1)
+               go func(idx int) {
+                       defer wg.Done()
+                       results[idx] = b.probeOne(probeCtx, members[idx], 
minRev)
+               }(i)
+       }
+       wg.Wait()
+       return results
+}
+
+// probeOne returns the per-member outcome for one iteration.
+func (b *barrierService) probeOne(ctx context.Context, m member, minRev int64) 
probeResult {
+       if m.isSelf {
+               c := b.cache()
+               if c == nil {
+                       return probeResult{member: m}
+               }
+               rev := c.GetMaxModRevision()
+               return probeResult{member: m, rev: rev, ready: rev >= minRev}
+       }
+
+       tier := b.peerLiaisons
+       if m.role == roleData {
+               tier = b.dataNodes
+       }
+       if tier == nil {
+               return probeResult{member: m, err: errors.New("no tier client 
wired")}
+       }
+       client := tier()
+       if client == nil {
+               return probeResult{member: m, err: errors.New("tier client 
unavailable")}
+       }
+       statusClient, err := client.NewNodeSchemaStatusClient(m.name)
+       if err != nil {
+               return probeResult{member: m, err: err}
+       }
+       resp, rpcErr := statusClient.GetMaxRevision(ctx, 
&clusterv1.GetMaxRevisionRequest{})
+       if rpcErr != nil {
+               // Cross-version policy: a Phase-1 peer that does not implement
+               // NodeSchemaStatusService returns codes.Unimplemented; treat 
that
+               // member as ready (assume max_revision = ∞) so partial-upgrade
+               // clusters do not deadlock all barrier callers.
+               if status.Code(rpcErr) == codes.Unimplemented {
+                       return probeResult{member: m, ready: true}
+               }
+               // Any other RPC error counts as a transient laggard for this
+               // iteration only — the next backoff iteration retries it.
+               return probeResult{member: m, err: rpcErr}
+       }
+       rev := resp.GetMaxModRevision()
+       return probeResult{member: m, rev: rev, ready: rev >= minRev}
+}
+
+// allReady reports whether every member's most recent probe returned ready.
+func allReady(results []probeResult) bool {
+       for _, r := range results {
+               if !r.ready {
+                       return false
+               }
+       }
+       return true
+}
+
+// revisionLaggards builds the laggards list for the timeout response,
+// preserving the watched-set order. Cross-version-ready and ready-this-pass
+// members are excluded so the list contains only the actual stragglers.
+func revisionLaggards(results []probeResult) []*schemav1.NodeLaggard {
+       laggards := make([]*schemav1.NodeLaggard, 0)
+       for _, r := range results {
+               if r.ready {
+                       continue
+               }
+               laggards = append(laggards, &schemav1.NodeLaggard{
+                       Node:               r.member.laggardName(),
+                       CurrentModRevision: r.rev,
+               })
+       }
+       return laggards
+}
diff --git a/banyand/liaison/grpc/barrier_cluster_test.go 
b/banyand/liaison/grpc/barrier_cluster_test.go
new file mode 100644
index 000000000..21a828d7c
--- /dev/null
+++ b/banyand/liaison/grpc/barrier_cluster_test.go
@@ -0,0 +1,321 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "errors"
+       "sort"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/durationpb"
+
+       clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+// fakeNodeStatusClient is a minimal clusterv1.NodeSchemaStatusServiceClient.
+// Only GetMaxRevision is implemented for Phase 2.2 tests; the other RPCs
+// land in 2.3/2.4 and panic if a test accidentally invokes them.
+type fakeNodeStatusClient struct {
+       err      error
+       callsRef *int32
+       maxRev   int64
+       delay    time.Duration
+}
+
+func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _ 
*clusterv1.GetMaxRevisionRequest, _ ...grpc.CallOption) 
(*clusterv1.GetMaxRevisionResponse, error) {
+       if f.callsRef != nil {
+               atomic.AddInt32(f.callsRef, 1)
+       }
+       if f.delay > 0 {
+               select {
+               case <-time.After(f.delay):
+               case <-ctx.Done():
+                       return nil, ctx.Err()
+               }
+       }
+       if f.err != nil {
+               return nil, f.err
+       }
+       return &clusterv1.GetMaxRevisionResponse{MaxModRevision: f.maxRev}, nil
+}
+
+func (*fakeNodeStatusClient) GetKeyRevisions(_ context.Context, _ 
*clusterv1.GetKeyRevisionsRequest, _ ...grpc.CallOption) 
(*clusterv1.GetKeyRevisionsResponse, error) {
+       panic("GetKeyRevisions: unused in Phase 2.2 tests")
+}
+
+func (*fakeNodeStatusClient) GetAbsentKeys(_ context.Context, _ 
*clusterv1.GetAbsentKeysRequest, _ ...grpc.CallOption) 
(*clusterv1.GetAbsentKeysResponse, error) {
+       panic("GetAbsentKeys: unused in Phase 2.2 tests")
+}
+
+// fakeQueueClient embeds queue.Client (a nil interface) so unused methods
+// panic at runtime; only the two methods the barrier fan-out actually calls
+// are overridden.
+type fakeQueueClient struct {
+       queue.Client
+       routeTable    *databasev1.RouteTable
+       statusClients map[string]clusterv1.NodeSchemaStatusServiceClient
+}
+
+func (f *fakeQueueClient) GetRouteTable() *databasev1.RouteTable {
+       if f.routeTable == nil {
+               return &databasev1.RouteTable{}
+       }
+       return f.routeTable
+}
+
+func (f *fakeQueueClient) NewNodeSchemaStatusClient(node string) 
(clusterv1.NodeSchemaStatusServiceClient, error) {
+       c, ok := f.statusClients[node]
+       if !ok {
+               return nil, errors.New("no fake client for node " + node)
+       }
+       return c, nil
+}
+
+func newFakeTier(active []string, clients 
map[string]clusterv1.NodeSchemaStatusServiceClient) *fakeQueueClient {
+       return &fakeQueueClient{
+               routeTable:    &databasev1.RouteTable{Active: active},
+               statusClients: clients,
+       }
+}
+
+// clusterFixture wires a barrier service for the cluster fan-out tests.
+type clusterFixture struct {
+       cache        barrierCacheReader
+       tier1, tier2 *fakeQueueClient
+       self         string
+}
+
+func (f *clusterFixture) build() *barrierService {
+       return newBarrierServiceCluster(
+               func() barrierCacheReader { return f.cache },
+               func() queue.Client { return f.tier1 },
+               func() queue.Client { return f.tier2 },
+               func() string { return f.self },
+       )
+}
+
+func laggardNames(laggards []*schemav1.NodeLaggard) []string {
+       out := make([]string, len(laggards))
+       for i, l := range laggards {
+               out[i] = l.GetNode()
+       }
+       sort.Strings(out)
+       return out
+}
+
+// TestFanOut_AllNodesReady_ReturnsApplied verifies that when every member
+// (self + peer liaisons + data nodes) reports max_rev >= min_rev, the call
+// returns Applied=true on the first iteration.
+func TestFanOut_AllNodesReady_ReturnsApplied(t *testing.T) {
+       const target = int64(100)
+       cache := &staticBarrierCache{maxModRevision: 200}
+       tier1 := newFakeTier([]string{"liaison-1"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "liaison-1": &fakeNodeStatusClient{maxRev: 150},
+       })
+       tier2 := newFakeTier([]string{"data-1", "data-2"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "data-1": &fakeNodeStatusClient{maxRev: 110},
+               "data-2": &fakeNodeStatusClient{maxRev: target},
+       })
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: 
"self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: target,
+               Timeout:     durationpb.New(50 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied())
+       assert.Empty(t, resp.GetLaggards())
+}
+
+// TestFanOut_OneLaggard_ReportsOnTimeout verifies that when one peer remains
+// behind the target on every iteration, the timeout response names exactly
+// that laggard with its addressable role-prefixed identifier.
+func TestFanOut_OneLaggard_ReportsOnTimeout(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       tier1 := newFakeTier(nil, nil)
+       tier2 := newFakeTier([]string{"data-1", "data-slow"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "data-1":    &fakeNodeStatusClient{maxRev: 100},
+               "data-slow": &fakeNodeStatusClient{maxRev: 42}, // never 
catches up
+       })
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: 
"self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(80 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+       require.Len(t, resp.GetLaggards(), 1)
+       assert.Equal(t, "data-data-slow", resp.GetLaggards()[0].GetNode())
+       assert.Equal(t, int64(42), 
resp.GetLaggards()[0].GetCurrentModRevision())
+}
+
+// TestFanOut_AllLaggards_ListsAll verifies that when every probed peer is
+// behind the target, all of them appear in the laggards list with their
+// role-prefixed names.
+func TestFanOut_AllLaggards_ListsAll(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 0} // self also behind
+       tier1 := newFakeTier([]string{"peer-l"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "peer-l": &fakeNodeStatusClient{maxRev: 5},
+       })
+       tier2 := newFakeTier([]string{"peer-d"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "peer-d": &fakeNodeStatusClient{maxRev: 7},
+       })
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: 
"self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(60 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+       assert.Equal(t, []string{"data-peer-d", "liaison-peer-l", 
"liaison-self-liaison"},
+               laggardNames(resp.GetLaggards()))
+}
+
+// TestFanOut_NodeRPCErrors_CountedAsLaggard verifies that a non-Unimplemented
+// gRPC error from a per-member probe makes that member a laggard for the
+// iteration but does not fail the whole call. With a generous catch-up
+// scenario the loop converges once the error clears.
+func TestFanOut_NodeRPCErrors_CountedAsLaggard(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       errClient := &fakeNodeStatusClient{err: status.Errorf(codes.Internal, 
"boom")}
+       tier1 := newFakeTier(nil, nil)
+       tier2 := newFakeTier([]string{"sick"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "sick": errClient,
+       })
+       svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: 
"self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(60 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+       require.Len(t, resp.GetLaggards(), 1)
+       assert.Equal(t, "data-sick", resp.GetLaggards()[0].GetNode())
+}
+
+// TestFanOut_EmptyActiveSet_ReturnsUnavailable verifies that when the snapshot
+// finds zero members (no self, no peers, no data nodes) the call fails fast
+// with codes.Unavailable rather than parking on an empty watched set.
+func TestFanOut_EmptyActiveSet_ReturnsUnavailable(t *testing.T) {
+       svc := (&clusterFixture{
+               cache: &staticBarrierCache{},
+               tier1: newFakeTier(nil, nil),
+               tier2: newFakeTier(nil, nil),
+               self:  "", // empty self → no member contributed
+       }).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 1,
+               Timeout:     durationpb.New(20 * time.Millisecond),
+       })
+       require.Error(t, err)
+       assert.Nil(t, resp)
+       assert.Equal(t, codes.Unavailable, status.Code(err))
+}
+
+// TestFanOut_ProbeIsShortUnary_NoServerWait verifies that a slow per-member
+// probe (the fake delays past the call deadline) is treated as a transient
+// laggard rather than as a successful long-poll. The whole call returns
+// applied=false within the timeout instead of blocking on the slow probe.
+func TestFanOut_ProbeIsShortUnary_NoServerWait(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       slow := &fakeNodeStatusClient{delay: 200 * time.Millisecond, maxRev: 
100}
+       tier2 := newFakeTier([]string{"slow"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "slow": slow,
+       })
+       svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), 
tier2: tier2, self: "self-liaison"}).build()
+
+       start := time.Now()
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(60 * time.Millisecond),
+       })
+       elapsed := time.Since(start)
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+       assert.Less(t, elapsed, 500*time.Millisecond,
+               "call must respect its own deadline rather than waiting for the 
slow per-probe response")
+       require.Len(t, resp.GetLaggards(), 1)
+       assert.Equal(t, "data-slow", resp.GetLaggards()[0].GetNode())
+}
+
+// TestFanOut_BackoffBounded verifies that when convergence requires multiple
+// iterations the caller observes >= 2 probes per member but the per-iteration
+// sleep stays bounded so the overall call respects its timeout. The fake
+// peer counts how many times GetMaxRevision was called; the loop should make
+// at least 2 probes inside an 80ms budget.
+func TestFanOut_BackoffBounded(t *testing.T) {
+       var calls int32
+       cache := &staticBarrierCache{maxModRevision: 100}
+       peer := &fakeNodeStatusClient{maxRev: 50, callsRef: &calls} // 
permanently behind
+       tier2 := newFakeTier([]string{"laggy"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "laggy": peer,
+       })
+       svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), 
tier2: tier2, self: "self-liaison"}).build()
+
+       start := time.Now()
+       _, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(80 * time.Millisecond),
+       })
+       elapsed := time.Since(start)
+       require.NoError(t, err)
+       assert.GreaterOrEqual(t, atomic.LoadInt32(&calls), int32(2),
+               "backoff loop must make multiple probes within the timeout 
window")
+       // 80ms budget plus init/grow/cap sleeps; even the worst-case wall-clock
+       // stays well under 500ms = barrierMaxInterval.
+       assert.Less(t, elapsed, 500*time.Millisecond,
+               "timeout must not be exceeded by the backoff schedule")
+}
+
+// TestFanOut_NodeReturnsUnimplemented_TreatedAsReady locks the cross-version
+// policy: a peer (or data node) that returns codes.Unimplemented from
+// NodeSchemaStatusService — i.e. a Phase-1 v0.11/v0.12 node — is treated as
+// ready (assume max_revision = ∞) so partial-upgrade clusters do not deadlock
+// barrier callers.
+func TestFanOut_NodeReturnsUnimplemented_TreatedAsReady(t *testing.T) {
+       cache := &staticBarrierCache{maxModRevision: 100}
+       legacy := &fakeNodeStatusClient{err: status.Error(codes.Unimplemented, 
"phase 1 node")}
+       tier2 := newFakeTier([]string{"phase1"}, 
map[string]clusterv1.NodeSchemaStatusServiceClient{
+               "phase1": legacy,
+       })
+       svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), 
tier2: tier2, self: "self-liaison"}).build()
+
+       resp, err := svc.AwaitRevisionApplied(context.Background(), 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(50 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied(),
+               "Unimplemented from a Phase-1 peer must not block a v0.13 
barrier caller")
+       assert.Empty(t, resp.GetLaggards())
+}
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index f71c8f9ca..d4b428e3f 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -184,10 +184,10 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                propertyServer: propertyService,
        }
 
-       var barrierSVC *barrierService
        var nodeStatusSVC *property.NodeSchemaStatusServer
+       var cacheProvider func() barrierCacheReader
        if svc, svcOk := schemaRegistry.(metadata.Service); svcOk {
-               barrierSVC = newBarrierService(func() barrierCacheReader {
+               cacheProvider = func() barrierCacheReader {
                        inner := svc.SchemaRegistry()
                        if inner == nil {
                                return nil
@@ -197,7 +197,7 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                                return nil
                        }
                        return bc
-               })
+               }
                // Phase 2 Step 2.1: every cluster member with a schema cache 
exposes
                // NodeSchemaStatusService so peer liaisons (Step 2.2 fan-out) 
can
                // probe this liaison's cache identically to a data node. The 
receiving
@@ -221,7 +221,6 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                traceSVC:      traceSVC,
                bydbQLSVC:     bydbQLSVC,
                groupRepo:     gr,
-               barrierSVC:    barrierSVC,
                nodeStatusSVC: nodeStatusSVC,
                streamRegistryServer: &streamRegistryServer{
                        schemaRegistry: schemaRegistry,
@@ -253,6 +252,28 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
                protector:           protectorService,
                routeTableProviders: routeProviders,
        }
+       // Phase 2 Step 2.2: wire the cluster-wide AwaitRevisionApplied fan-out.
+       // Tier1 (peer liaisons) and tier2 (data nodes) connection pools are
+       // borrowed via the queue.Client interface added in #1109; the receiving
+       // liaison's name is read from s.curNode, which initCurrentNode 
populates
+       // during PreRun — hence the closure indirection. The explicit nil check
+       // covers the test-context case where ContextNodeKey is unset and
+       // initCurrentNode leaves curNode nil; an empty selfName makes the
+       // barrier exclude self from the watched set, which is the correct
+       // behavior for headless test fixtures.
+       if cacheProvider != nil {
+               s.barrierSVC = newBarrierServiceCluster(
+                       cacheProvider,
+                       func() queue.Client { return tir1Client },
+                       func() queue.Client { return tir2Client },
+                       func() string {
+                               if s.curNode == nil {
+                                       return ""
+                               }
+                               return s.curNode.GetMetadata().GetName()
+                       },
+               )
+       }
        s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC, 
traceSVC, s.propertyServer}
        s.queryAccessLogRecorders = []queryAccessLogRecorder{streamSVC, 
measureSVC, traceSVC, s.propertyServer}
 

Reply via email to