This is an automated email from the ASF dual-hosted git repository.
wu-sheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c936ce8b7 feat(barrier): cluster fan-out for AwaitRevisionApplied
(Phase 2 Step 2.2) (#1111)
c936ce8b7 is described below
commit c936ce8b790017055727656cbfb8197c83457aaf
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed May 6 17:09:28 2026 +0800
feat(barrier): cluster fan-out for AwaitRevisionApplied (Phase 2 Step 2.2)
(#1111)
---
CHANGES.md | 3 +-
banyand/liaison/grpc/barrier.go | 41 +++-
banyand/liaison/grpc/barrier_cluster.go | 254 +++++++++++++++++++++
banyand/liaison/grpc/barrier_cluster_test.go | 321 +++++++++++++++++++++++++++
banyand/liaison/grpc/server.go | 29 ++-
5 files changed, 639 insertions(+), 9 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a0c3dc0a9..daad0e5da 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -33,7 +33,8 @@ Release Notes.
- Guard `pkg/schema/cache` against out-of-order `EventDelete` events; expose
monotonic `LatestModRevision` watermark.
- Schema consistency (Phase 2 in progress): cluster-wide barrier groundwork.
Internal-only; no client-facing surface impact yet.
- Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`,
`GetAbsentKeys`) registered on every cluster member that holds a schema cache,
so peer liaisons and data nodes can be probed identically by the upcoming
barrier fan-out (#1108).
- - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the
barrier fan-out can borrow the existing tier1/tier2 connection pools instead of
opening a parallel mesh.
+ - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the
barrier fan-out can borrow the existing tier1/tier2 connection pools instead of
opening a parallel mesh (#1109).
+ - `AwaitRevisionApplied` now fans out across the receiving liaison's frozen
tier1 (peer-liaison) + tier2 (data-node) Active set, probing each member in
parallel via `GetMaxRevision` with shared per-call deadline. Cross-version
peers returning `codes.Unimplemented` are treated as ready so partial-upgrade
clusters do not deadlock; transient RPC errors count as per-iteration laggards.
Empty Active set fails fast with `codes.Unavailable`.
### Bug Fixes
diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go
index 8e135a2e8..098301935 100644
--- a/banyand/liaison/grpc/barrier.go
+++ b/banyand/liaison/grpc/barrier.go
@@ -27,6 +27,7 @@ import (
schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
)
const (
@@ -52,12 +53,38 @@ type barrierCacheReader interface {
type barrierService struct {
schemav1.UnimplementedSchemaBarrierServiceServer
cacheProvider func() barrierCacheReader
+ // peerLiaisons / dataNodes / selfName are the Phase 2.2 cluster fan-out
+ // dependencies. When all three are set, AwaitRevisionApplied probes the
+ // frozen tier1+tier2 watched set in parallel; when any are nil it falls
+ // back to the legacy in-process loop (used by Phase 1 unit tests).
+ peerLiaisons func() queue.Client
+ dataNodes func() queue.Client
+ selfName func() string
}
func newBarrierService(cacheProvider func() barrierCacheReader)
*barrierService {
return &barrierService{cacheProvider: cacheProvider}
}
+// newBarrierServiceCluster wires the Phase 2.2 cluster fan-out dependencies
+// alongside the cache provider. The receiving liaison's name and the per-tier
+// queue clients are resolved via closures so they can be captured at gRPC
+// server construction time and lazily evaluated once PreRun finishes
+// populating curNode and the connection pools.
+func newBarrierServiceCluster(
+ cacheProvider func() barrierCacheReader,
+ peerLiaisons func() queue.Client,
+ dataNodes func() queue.Client,
+ selfName func() string,
+) *barrierService {
+ return &barrierService{
+ cacheProvider: cacheProvider,
+ peerLiaisons: peerLiaisons,
+ dataNodes: dataNodes,
+ selfName: selfName,
+ }
+}
+
// cache resolves the current barrier cache reader. It returns nil if the
// underlying schema registry is not yet initialized.
func (b *barrierService) cache() barrierCacheReader {
@@ -67,11 +94,17 @@ func (b *barrierService) cache() barrierCacheReader {
return b.cacheProvider()
}
-// AwaitRevisionApplied blocks until the cache's max modRevision is >=
req.MinRevision
-// or the timeout elapses. In standalone mode there is one node, so Laggards
carries a
-// single entry whose current_mod_revision reports the cache watermark — this
lets
-// callers diagnose how far behind the standalone cache is even when
applied=false.
+// AwaitRevisionApplied blocks until every node in the watched set reports a
+// max modRevision at or above req.MinRevision. When the cluster fan-out
+// dependencies are wired (production), the call probes the frozen tier1 +
+// tier2 + self watched set in parallel via NodeSchemaStatusService; without
+// those dependencies (Phase 1 unit-test path), it falls back to a single
+// in-process cache poll, returning a self-only laggard on timeout so callers
+// can diagnose how far behind the standalone cache is.
func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req
*schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse,
error) {
+ if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil {
+ return b.awaitRevisionAppliedCluster(ctx, req)
+ }
deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout()))
pollCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()
diff --git a/banyand/liaison/grpc/barrier_cluster.go
b/banyand/liaison/grpc/barrier_cluster.go
new file mode 100644
index 000000000..8cfab2938
--- /dev/null
+++ b/banyand/liaison/grpc/barrier_cluster.go
@@ -0,0 +1,254 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+// memberRole identifies the role under which a watched-set member was
discovered.
+type memberRole int
+
+const (
+ roleLiaison memberRole = iota
+ roleData
+)
+
+func (r memberRole) String() string {
+ switch r {
+ case roleLiaison:
+ return "liaison"
+ case roleData:
+ return "data"
+ default:
+ return "unknown_role"
+ }
+}
+
+// member is a single cluster member in the frozen watched set built at the
+// start of an Await* call.
+type member struct {
+ name string
+ role memberRole
+ isSelf bool
+}
+
+// laggardName returns the addressable laggard identifier per the plan's
+// `<role>-<Metadata.Name>` convention.
+func (m member) laggardName() string {
+ return m.role.String() + "-" + m.name
+}
+
+// snapshotMembers builds the frozen watched set from the receiving liaison's
+// in-process self plus the tier1 (peer-liaison) and tier2 (data-node) Active
+// route tables. Dedup is by Metadata.Name with the first-seen tier winning,
+// so a hybrid host running both roles is probed exactly once: once via self
+// if it is the receiving liaison, otherwise via tier1.
+//
+// Standalone fallback: when both tier route tables are empty (the local
+// pipeline returns an empty RouteTable) the watched set degenerates to
+// {self}, which the probe loop handles uniformly with the multi-member case.
+func (b *barrierService) snapshotMembers() []member {
+ seen := map[string]struct{}{}
+ var watched []member
+
+ if b.selfName != nil {
+ if name := b.selfName(); name != "" {
+ seen[name] = struct{}{}
+ watched = append(watched, member{name: name, role:
roleLiaison, isSelf: true})
+ }
+ }
+
+ addFromTier := func(provider func() queue.Client, role memberRole) {
+ if provider == nil {
+ return
+ }
+ client := provider()
+ if client == nil {
+ return
+ }
+ rt := client.GetRouteTable()
+ if rt == nil {
+ return
+ }
+ for _, name := range rt.GetActive() {
+ if _, dup := seen[name]; dup {
+ continue
+ }
+ seen[name] = struct{}{}
+ watched = append(watched, member{name: name, role:
role})
+ }
+ }
+ addFromTier(b.peerLiaisons, roleLiaison)
+ addFromTier(b.dataNodes, roleData)
+
+ return watched
+}
+
+// probeResult is the outcome of a single per-iteration probe of one member.
+//
+// ready=true means the member is at or above the target revision (or returned
+// codes.Unimplemented, which the cross-version policy treats as ready). When
+// ready=false the (rev, err) pair carries whatever the probe last observed —
+// rev=0 + err!=nil indicates a transient RPC failure that the next iteration
+// retries; rev>0 + err==nil indicates the member is online but behind.
+type probeResult struct {
+ err error
+ member member
+ rev int64
+ ready bool
+}
+
+// awaitRevisionAppliedCluster runs the cluster-wide fan-out for
+// AwaitRevisionApplied. The receiving liaison's own cache is probed
+// in-process; peers are probed via clusterv1.NodeSchemaStatusService over the
+// *grpc.ClientConn borrowed from queue.Client.
+func (b *barrierService) awaitRevisionAppliedCluster(ctx context.Context, req
*schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse,
error) {
+ members := b.snapshotMembers()
+ if len(members) == 0 {
+ return nil, status.Errorf(codes.Unavailable, "no active cluster
members")
+ }
+
+ deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout()))
+ pollCtx, cancel := context.WithDeadline(ctx, deadline)
+ defer cancel()
+
+ interval := barrierInitInterval
+ var lastResults []probeResult
+ for {
+ lastResults = b.probeMembers(pollCtx, members,
req.GetMinRevision(), deadline)
+ if allReady(lastResults) {
+ return &schemav1.AwaitRevisionAppliedResponse{Applied:
true}, nil
+ }
+ if time.Now().After(deadline) {
+ return &schemav1.AwaitRevisionAppliedResponse{
+ Applied: false,
+ Laggards: revisionLaggards(lastResults),
+ }, nil
+ }
+ select {
+ case <-time.After(interval):
+ case <-pollCtx.Done():
+ return &schemav1.AwaitRevisionAppliedResponse{
+ Applied: false,
+ Laggards: revisionLaggards(lastResults),
+ }, nil
+ }
+ interval = barrierBackoff(interval)
+ }
+}
+
+// probeMembers runs one parallel iteration of GetMaxRevision probes against
+// the watched set. Each per-member probe context inherits the call-wide
+// deadline (shared, not divided across N members) so the loop's wall-clock is
+// bounded by req.timeout regardless of fan-out width.
+func (b *barrierService) probeMembers(ctx context.Context, members []member,
minRev int64, deadline time.Time) []probeResult {
+ results := make([]probeResult, len(members))
+ var wg sync.WaitGroup
+ probeCtx, cancel := context.WithDeadline(ctx, deadline)
+ defer cancel()
+ for i := range members {
+ wg.Add(1)
+ go func(idx int) {
+ defer wg.Done()
+ results[idx] = b.probeOne(probeCtx, members[idx],
minRev)
+ }(i)
+ }
+ wg.Wait()
+ return results
+}
+
+// probeOne returns the per-member outcome for one iteration.
+func (b *barrierService) probeOne(ctx context.Context, m member, minRev int64)
probeResult {
+ if m.isSelf {
+ c := b.cache()
+ if c == nil {
+ return probeResult{member: m}
+ }
+ rev := c.GetMaxModRevision()
+ return probeResult{member: m, rev: rev, ready: rev >= minRev}
+ }
+
+ tier := b.peerLiaisons
+ if m.role == roleData {
+ tier = b.dataNodes
+ }
+ if tier == nil {
+ return probeResult{member: m, err: errors.New("no tier client
wired")}
+ }
+ client := tier()
+ if client == nil {
+ return probeResult{member: m, err: errors.New("tier client
unavailable")}
+ }
+ statusClient, err := client.NewNodeSchemaStatusClient(m.name)
+ if err != nil {
+ return probeResult{member: m, err: err}
+ }
+ resp, rpcErr := statusClient.GetMaxRevision(ctx,
&clusterv1.GetMaxRevisionRequest{})
+ if rpcErr != nil {
+ // Cross-version policy: a Phase-1 peer that does not implement
+ // NodeSchemaStatusService returns codes.Unimplemented; treat
that
+ // member as ready (assume max_revision = ∞) so partial-upgrade
+ // clusters do not deadlock all barrier callers.
+ if status.Code(rpcErr) == codes.Unimplemented {
+ return probeResult{member: m, ready: true}
+ }
+ // Any other RPC error counts as a transient laggard for this
+ // iteration only — the next backoff iteration retries it.
+ return probeResult{member: m, err: rpcErr}
+ }
+ rev := resp.GetMaxModRevision()
+ return probeResult{member: m, rev: rev, ready: rev >= minRev}
+}
+
+// allReady reports whether every member's most recent probe returned ready.
+func allReady(results []probeResult) bool {
+ for _, r := range results {
+ if !r.ready {
+ return false
+ }
+ }
+ return true
+}
+
+// revisionLaggards builds the laggards list for the timeout response,
+// preserving the watched-set order. Cross-version-ready and ready-this-pass
+// members are excluded so the list contains only the actual stragglers.
+func revisionLaggards(results []probeResult) []*schemav1.NodeLaggard {
+ laggards := make([]*schemav1.NodeLaggard, 0)
+ for _, r := range results {
+ if r.ready {
+ continue
+ }
+ laggards = append(laggards, &schemav1.NodeLaggard{
+ Node: r.member.laggardName(),
+ CurrentModRevision: r.rev,
+ })
+ }
+ return laggards
+}
diff --git a/banyand/liaison/grpc/barrier_cluster_test.go
b/banyand/liaison/grpc/barrier_cluster_test.go
new file mode 100644
index 000000000..21a828d7c
--- /dev/null
+++ b/banyand/liaison/grpc/barrier_cluster_test.go
@@ -0,0 +1,321 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "errors"
+ "sort"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/types/known/durationpb"
+
+ clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ schemav1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+// fakeNodeStatusClient is a minimal clusterv1.NodeSchemaStatusServiceClient.
+// Only GetMaxRevision is implemented for Phase 2.2 tests; the other RPCs
+// land in 2.3/2.4 and panic if a test accidentally invokes them.
+type fakeNodeStatusClient struct {
+ err error
+ callsRef *int32
+ maxRev int64
+ delay time.Duration
+}
+
+func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _
*clusterv1.GetMaxRevisionRequest, _ ...grpc.CallOption)
(*clusterv1.GetMaxRevisionResponse, error) {
+ if f.callsRef != nil {
+ atomic.AddInt32(f.callsRef, 1)
+ }
+ if f.delay > 0 {
+ select {
+ case <-time.After(f.delay):
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ }
+ if f.err != nil {
+ return nil, f.err
+ }
+ return &clusterv1.GetMaxRevisionResponse{MaxModRevision: f.maxRev}, nil
+}
+
+func (*fakeNodeStatusClient) GetKeyRevisions(_ context.Context, _
*clusterv1.GetKeyRevisionsRequest, _ ...grpc.CallOption)
(*clusterv1.GetKeyRevisionsResponse, error) {
+ panic("GetKeyRevisions: unused in Phase 2.2 tests")
+}
+
+func (*fakeNodeStatusClient) GetAbsentKeys(_ context.Context, _
*clusterv1.GetAbsentKeysRequest, _ ...grpc.CallOption)
(*clusterv1.GetAbsentKeysResponse, error) {
+ panic("GetAbsentKeys: unused in Phase 2.2 tests")
+}
+
+// fakeQueueClient embeds queue.Client (a nil interface) so unused methods
+// panic at runtime; only the two methods the barrier fan-out actually calls
+// are overridden.
+type fakeQueueClient struct {
+ queue.Client
+ routeTable *databasev1.RouteTable
+ statusClients map[string]clusterv1.NodeSchemaStatusServiceClient
+}
+
+func (f *fakeQueueClient) GetRouteTable() *databasev1.RouteTable {
+ if f.routeTable == nil {
+ return &databasev1.RouteTable{}
+ }
+ return f.routeTable
+}
+
+func (f *fakeQueueClient) NewNodeSchemaStatusClient(node string)
(clusterv1.NodeSchemaStatusServiceClient, error) {
+ c, ok := f.statusClients[node]
+ if !ok {
+ return nil, errors.New("no fake client for node " + node)
+ }
+ return c, nil
+}
+
+func newFakeTier(active []string, clients
map[string]clusterv1.NodeSchemaStatusServiceClient) *fakeQueueClient {
+ return &fakeQueueClient{
+ routeTable: &databasev1.RouteTable{Active: active},
+ statusClients: clients,
+ }
+}
+
+// clusterFixture wires a barrier service for the cluster fan-out tests.
+type clusterFixture struct {
+ cache barrierCacheReader
+ tier1, tier2 *fakeQueueClient
+ self string
+}
+
+func (f *clusterFixture) build() *barrierService {
+ return newBarrierServiceCluster(
+ func() barrierCacheReader { return f.cache },
+ func() queue.Client { return f.tier1 },
+ func() queue.Client { return f.tier2 },
+ func() string { return f.self },
+ )
+}
+
+func laggardNames(laggards []*schemav1.NodeLaggard) []string {
+ out := make([]string, len(laggards))
+ for i, l := range laggards {
+ out[i] = l.GetNode()
+ }
+ sort.Strings(out)
+ return out
+}
+
+// TestFanOut_AllNodesReady_ReturnsApplied verifies that when every member
+// (self + peer liaisons + data nodes) reports max_rev >= min_rev, the call
+// returns Applied=true on the first iteration.
+func TestFanOut_AllNodesReady_ReturnsApplied(t *testing.T) {
+ const target = int64(100)
+ cache := &staticBarrierCache{maxModRevision: 200}
+ tier1 := newFakeTier([]string{"liaison-1"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "liaison-1": &fakeNodeStatusClient{maxRev: 150},
+ })
+ tier2 := newFakeTier([]string{"data-1", "data-2"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "data-1": &fakeNodeStatusClient{maxRev: 110},
+ "data-2": &fakeNodeStatusClient{maxRev: target},
+ })
+ svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self:
"self-liaison"}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: target,
+ Timeout: durationpb.New(50 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.True(t, resp.GetApplied())
+ assert.Empty(t, resp.GetLaggards())
+}
+
+// TestFanOut_OneLaggard_ReportsOnTimeout verifies that when one peer remains
+// behind the target on every iteration, the timeout response names exactly
+// that laggard with its addressable role-prefixed identifier.
+func TestFanOut_OneLaggard_ReportsOnTimeout(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 100}
+ tier1 := newFakeTier(nil, nil)
+ tier2 := newFakeTier([]string{"data-1", "data-slow"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "data-1": &fakeNodeStatusClient{maxRev: 100},
+ "data-slow": &fakeNodeStatusClient{maxRev: 42}, // never
catches up
+ })
+ svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self:
"self-liaison"}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(80 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.False(t, resp.GetApplied())
+ require.Len(t, resp.GetLaggards(), 1)
+ assert.Equal(t, "data-data-slow", resp.GetLaggards()[0].GetNode())
+ assert.Equal(t, int64(42),
resp.GetLaggards()[0].GetCurrentModRevision())
+}
+
+// TestFanOut_AllLaggards_ListsAll verifies that when every probed peer is
+// behind the target, all of them appear in the laggards list with their
+// role-prefixed names.
+func TestFanOut_AllLaggards_ListsAll(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 0} // self also behind
+ tier1 := newFakeTier([]string{"peer-l"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "peer-l": &fakeNodeStatusClient{maxRev: 5},
+ })
+ tier2 := newFakeTier([]string{"peer-d"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "peer-d": &fakeNodeStatusClient{maxRev: 7},
+ })
+ svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self:
"self-liaison"}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(60 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.False(t, resp.GetApplied())
+ assert.Equal(t, []string{"data-peer-d", "liaison-peer-l",
"liaison-self-liaison"},
+ laggardNames(resp.GetLaggards()))
+}
+
+// TestFanOut_NodeRPCErrors_CountedAsLaggard verifies that a non-Unimplemented
+// gRPC error from a per-member probe makes that member a laggard for the
+// iteration but does not fail the whole call. With a generous catch-up
+// scenario the loop converges once the error clears.
+func TestFanOut_NodeRPCErrors_CountedAsLaggard(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 100}
+ errClient := &fakeNodeStatusClient{err: status.Errorf(codes.Internal,
"boom")}
+ tier1 := newFakeTier(nil, nil)
+ tier2 := newFakeTier([]string{"sick"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "sick": errClient,
+ })
+ svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self:
"self-liaison"}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(60 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.False(t, resp.GetApplied())
+ require.Len(t, resp.GetLaggards(), 1)
+ assert.Equal(t, "data-sick", resp.GetLaggards()[0].GetNode())
+}
+
+// TestFanOut_EmptyActiveSet_ReturnsUnavailable verifies that when the snapshot
+// finds zero members (no self, no peers, no data nodes) the call fails fast
+// with codes.Unavailable rather than parking on an empty watched set.
+func TestFanOut_EmptyActiveSet_ReturnsUnavailable(t *testing.T) {
+ svc := (&clusterFixture{
+ cache: &staticBarrierCache{},
+ tier1: newFakeTier(nil, nil),
+ tier2: newFakeTier(nil, nil),
+ self: "", // empty self → no member contributed
+ }).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 1,
+ Timeout: durationpb.New(20 * time.Millisecond),
+ })
+ require.Error(t, err)
+ assert.Nil(t, resp)
+ assert.Equal(t, codes.Unavailable, status.Code(err))
+}
+
+// TestFanOut_ProbeIsShortUnary_NoServerWait verifies that a slow per-member
+// probe (the fake delays past the call deadline) is treated as a transient
+// laggard rather than as a successful long-poll. The whole call returns
+// applied=false within the timeout instead of blocking on the slow probe.
+func TestFanOut_ProbeIsShortUnary_NoServerWait(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 100}
+ slow := &fakeNodeStatusClient{delay: 200 * time.Millisecond, maxRev:
100}
+ tier2 := newFakeTier([]string{"slow"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "slow": slow,
+ })
+ svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil),
tier2: tier2, self: "self-liaison"}).build()
+
+ start := time.Now()
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(60 * time.Millisecond),
+ })
+ elapsed := time.Since(start)
+ require.NoError(t, err)
+ assert.False(t, resp.GetApplied())
+ assert.Less(t, elapsed, 500*time.Millisecond,
+ "call must respect its own deadline rather than waiting for the
slow per-probe response")
+ require.Len(t, resp.GetLaggards(), 1)
+ assert.Equal(t, "data-slow", resp.GetLaggards()[0].GetNode())
+}
+
+// TestFanOut_BackoffBounded verifies that when convergence requires multiple
+// iterations the caller observes >= 2 probes per member but the per-iteration
+// sleep stays bounded so the overall call respects its timeout. The fake
+// peer counts how many times GetMaxRevision was called; the loop should make
+// at least 2 probes inside an 80ms budget.
+func TestFanOut_BackoffBounded(t *testing.T) {
+ var calls int32
+ cache := &staticBarrierCache{maxModRevision: 100}
+ peer := &fakeNodeStatusClient{maxRev: 50, callsRef: &calls} //
permanently behind
+ tier2 := newFakeTier([]string{"laggy"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "laggy": peer,
+ })
+ svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil),
tier2: tier2, self: "self-liaison"}).build()
+
+ start := time.Now()
+ _, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(80 * time.Millisecond),
+ })
+ elapsed := time.Since(start)
+ require.NoError(t, err)
+ assert.GreaterOrEqual(t, atomic.LoadInt32(&calls), int32(2),
+ "backoff loop must make multiple probes within the timeout
window")
+ // 80ms budget plus init/grow/cap sleeps; even the worst-case wall-clock
+ // stays well under 500ms = barrierMaxInterval.
+ assert.Less(t, elapsed, 500*time.Millisecond,
+ "timeout must not be exceeded by the backoff schedule")
+}
+
+// TestFanOut_NodeReturnsUnimplemented_TreatedAsReady locks the cross-version
+// policy: a peer (or data node) that returns codes.Unimplemented from
+// NodeSchemaStatusService — i.e. a Phase-1 v0.11/v0.12 node — is treated as
+// ready (assume max_revision = ∞) so partial-upgrade clusters do not deadlock
+// barrier callers.
+func TestFanOut_NodeReturnsUnimplemented_TreatedAsReady(t *testing.T) {
+ cache := &staticBarrierCache{maxModRevision: 100}
+ legacy := &fakeNodeStatusClient{err: status.Error(codes.Unimplemented,
"phase 1 node")}
+ tier2 := newFakeTier([]string{"phase1"},
map[string]clusterv1.NodeSchemaStatusServiceClient{
+ "phase1": legacy,
+ })
+ svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil),
tier2: tier2, self: "self-liaison"}).build()
+
+ resp, err := svc.AwaitRevisionApplied(context.Background(),
&schemav1.AwaitRevisionAppliedRequest{
+ MinRevision: 100,
+ Timeout: durationpb.New(50 * time.Millisecond),
+ })
+ require.NoError(t, err)
+ assert.True(t, resp.GetApplied(),
+ "Unimplemented from a Phase-1 peer must not block a v0.13
barrier caller")
+ assert.Empty(t, resp.GetLaggards())
+}
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index f71c8f9ca..d4b428e3f 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -184,10 +184,10 @@ func NewServer(_ context.Context, tir1Client, tir2Client,
broadcaster queue.Clie
propertyServer: propertyService,
}
- var barrierSVC *barrierService
var nodeStatusSVC *property.NodeSchemaStatusServer
+ var cacheProvider func() barrierCacheReader
if svc, svcOk := schemaRegistry.(metadata.Service); svcOk {
- barrierSVC = newBarrierService(func() barrierCacheReader {
+ cacheProvider = func() barrierCacheReader {
inner := svc.SchemaRegistry()
if inner == nil {
return nil
@@ -197,7 +197,7 @@ func NewServer(_ context.Context, tir1Client, tir2Client,
broadcaster queue.Clie
return nil
}
return bc
- })
+ }
// Phase 2 Step 2.1: every cluster member with a schema cache
exposes
// NodeSchemaStatusService so peer liaisons (Step 2.2 fan-out)
can
// probe this liaison's cache identically to a data node. The
receiving
@@ -221,7 +221,6 @@ func NewServer(_ context.Context, tir1Client, tir2Client,
broadcaster queue.Clie
traceSVC: traceSVC,
bydbQLSVC: bydbQLSVC,
groupRepo: gr,
- barrierSVC: barrierSVC,
nodeStatusSVC: nodeStatusSVC,
streamRegistryServer: &streamRegistryServer{
schemaRegistry: schemaRegistry,
@@ -253,6 +252,28 @@ func NewServer(_ context.Context, tir1Client, tir2Client,
broadcaster queue.Clie
protector: protectorService,
routeTableProviders: routeProviders,
}
+ // Phase 2 Step 2.2: wire the cluster-wide AwaitRevisionApplied fan-out.
+ // Tier1 (peer liaisons) and tier2 (data nodes) connection pools are
+ // borrowed via the queue.Client interface added in #1109; the receiving
+ // liaison's name is read from s.curNode, which initCurrentNode
populates
+ // during PreRun — hence the closure indirection. The explicit nil check
+ // covers the test-context case where ContextNodeKey is unset and
+ // initCurrentNode leaves curNode nil; an empty selfName makes the
+ // barrier exclude self from the watched set, which is the correct
+ // behavior for headless test fixtures.
+ if cacheProvider != nil {
+ s.barrierSVC = newBarrierServiceCluster(
+ cacheProvider,
+ func() queue.Client { return tir1Client },
+ func() queue.Client { return tir2Client },
+ func() string {
+ if s.curNode == nil {
+ return ""
+ }
+ return s.curNode.GetMetadata().GetName()
+ },
+ )
+ }
s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC,
traceSVC, s.propertyServer}
s.queryAccessLogRecorders = []queryAccessLogRecorder{streamSVC,
measureSVC, traceSVC, s.propertyServer}