This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-step-2-2-fanout-await-revision in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3588e39c58eb316a717bca6f767ef7d786e3e656 Author: Hongtao Gao <[email protected]> AuthorDate: Sun May 3 11:06:33 2026 +0000 feat(barrier): fan out AwaitRevisionApplied across cluster (Phase 2 Step 2.2) Phase 1's AwaitRevisionApplied confirmed only the receiving liaison's local cache. Phase 2 Step 2.2 extends the call so the receiving liaison probes a frozen watched set of (self + every peer liaison + every data node) in parallel, returns applied=true only when the min over all members reaches min_revision, and returns laggards on timeout. Membership snapshot: - Self via curNode.Metadata.Name (resolved at call time so PreRun has populated curNode by the time the barrier is invoked). - Peer liaisons from tir1Client.GetRouteTable().Active (tier1 = liaison ↔ liaison routing per api/proto/banyandb/database/v1/rpc.proto:946). - Data nodes from tir2Client.GetRouteTable().Active (tier2 = data-node routing). - Dedup by Metadata.Name with first-seen tier winning, so a hybrid host running both Role_ROLE_LIAISON and Role_ROLE_DATA is probed exactly once. Standalone deployments (local pipeline → empty route tables) degenerate to {self} and use the same probe loop. Per-member probe: - Self: in-process barrierCacheReader.GetMaxModRevision() — no goroutine, no RPC, same call Phase 1 already used. - Peer: clusterv1.NodeSchemaStatusService.GetMaxRevision via the *grpc.ClientConn borrowed from queue.Client.NewNodeSchemaStatusClient (added in #1109) — no parallel connection mesh, inherits pub's auth / TLS / health-check / circuit-breaker. - Per-probe context inherits the call-wide deadline (shared, not divided across N members) so wall-clock stays bounded by req.timeout regardless of fan-out width. - sync.WaitGroup-based parallel iteration; results array indexed by member position so order is stable for laggard reporting. Outcome rules: - min(rev across all members) >= min_revision → applied=true (early exit, typically 1-3 iterations). - codes.Unimplemented from a peer → treated as ready (assume max_revision = ∞) per the cross-version policy. A v0.13 liaison rolling onto a v0.11/v0.12 data-node fleet does not deadlock. - Any other gRPC error (Unavailable, Internal, deadline exceeded on a single probe, …) counts as a per-iteration laggard only — the next backoff iteration retries it. Members appear in the response's laggards list only on overall call timeout. - Snapshot finds zero members AND no self → codes.Unavailable("no active cluster members"), failing fast rather than parking on an empty watched set. - Laggard identifier format: <role>-<Metadata.Name> (liaison-foo / data-bar) so logs are unambiguous when liaisons and data nodes share a host. Backoff cadence is unchanged from Phase 1 (10ms init × 1.5 cap 500ms, 5s default timeout). The §5.0 sequence-diagram math still holds: each member sees ≤ ~10 probes per 5s budget; most calls converge in 1-3 iterations and return sub-100ms. Phase 1 tests are unchanged: newBarrierService(cacheProvider) leaves peerLiaisons / dataNodes / selfName nil, so AwaitRevisionApplied falls back to the legacy in-process loop. Production wiring uses newBarrierServiceCluster which sets all three. Tests (banyand/liaison/grpc/barrier_cluster_test.go): - TestFanOut_AllNodesReady_ReturnsApplied — happy path, three-tier watched set converges first iteration. - TestFanOut_OneLaggard_ReportsOnTimeout — one peer permanently behind, laggard names exactly that peer with role-prefixed identifier. - TestFanOut_AllLaggards_ListsAll — every member behind, every member in the laggards list with role tags. - TestFanOut_NodeRPCErrors_CountedAsLaggard — codes.Internal from one peer is a transient laggard, doesn't fail the whole call. - TestFanOut_EmptyActiveSet_ReturnsUnavailable — zero members + no self → codes.Unavailable. - TestFanOut_ProbeIsShortUnary_NoServerWait — slow per-probe response must not stretch the whole call past its deadline. - TestFanOut_BackoffBounded — multi-iteration polling stays inside the 10ms-500ms envelope. - TestFanOut_NodeReturnsUnimplemented_TreatedAsReady — cross-version policy regression. Frozen-snapshot semantics for mid-call leave/eviction/late-join, plus the NodeLaggard.reason proto field, land in the next PR (Step 2.2 snapshot semantics). Distributed integration §6.12a/b/d depend on PauseDataNodeWatch (Step 1.0 deferred per Q-12) so they are not added here either. via [HAPI](https://hapi.run) --- CHANGES.md | 3 +- banyand/liaison/grpc/barrier.go | 41 +++- banyand/liaison/grpc/barrier_cluster.go | 254 +++++++++++++++++++++ banyand/liaison/grpc/barrier_cluster_test.go | 321 +++++++++++++++++++++++++++ banyand/liaison/grpc/server.go | 20 +- 5 files changed, 630 insertions(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a0c3dc0a9..daad0e5da 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -33,7 +33,8 @@ Release Notes. - Guard `pkg/schema/cache` against out-of-order `EventDelete` events; expose monotonic `LatestModRevision` watermark. - Schema consistency (Phase 2 in progress): cluster-wide barrier groundwork. Internal-only; no client-facing surface impact yet. - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`, `GetAbsentKeys`) registered on every cluster member that holds a schema cache, so peer liaisons and data nodes can be probed identically by the upcoming barrier fan-out (#1108). - - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the barrier fan-out can borrow the existing tier1/tier2 connection pools instead of opening a parallel mesh. + - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the barrier fan-out can borrow the existing tier1/tier2 connection pools instead of opening a parallel mesh (#1109). + - `AwaitRevisionApplied` now fans out across the receiving liaison's frozen tier1 (peer-liaison) + tier2 (data-node) Active set, probing each member in parallel via `GetMaxRevision` with shared per-call deadline. Cross-version peers returning `codes.Unimplemented` are treated as ready so partial-upgrade clusters do not deadlock; transient RPC errors count as per-iteration laggards. Empty Active set fails fast with `codes.Unavailable`. ### Bug Fixes diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go index 8e135a2e8..098301935 100644 --- a/banyand/liaison/grpc/barrier.go +++ b/banyand/liaison/grpc/barrier.go @@ -27,6 +27,7 @@ import ( schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/banyand/queue" ) const ( @@ -52,12 +53,38 @@ type barrierCacheReader interface { type barrierService struct { schemav1.UnimplementedSchemaBarrierServiceServer cacheProvider func() barrierCacheReader + // peerLiaisons / dataNodes / selfName are the Phase 2.2 cluster fan-out + // dependencies. When all three are set, AwaitRevisionApplied probes the + // frozen tier1+tier2 watched set in parallel; when any are nil it falls + // back to the legacy in-process loop (used by Phase 1 unit tests). + peerLiaisons func() queue.Client + dataNodes func() queue.Client + selfName func() string } func newBarrierService(cacheProvider func() barrierCacheReader) *barrierService { return &barrierService{cacheProvider: cacheProvider} } +// newBarrierServiceCluster wires the Phase 2.2 cluster fan-out dependencies +// alongside the cache provider. The receiving liaison's name and the per-tier +// queue clients are resolved via closures so they can be captured at gRPC +// server construction time and lazily evaluated once PreRun finishes +// populating curNode and the connection pools. +func newBarrierServiceCluster( + cacheProvider func() barrierCacheReader, + peerLiaisons func() queue.Client, + dataNodes func() queue.Client, + selfName func() string, +) *barrierService { + return &barrierService{ + cacheProvider: cacheProvider, + peerLiaisons: peerLiaisons, + dataNodes: dataNodes, + selfName: selfName, + } +} + // cache resolves the current barrier cache reader. It returns nil if the // underlying schema registry is not yet initialized. func (b *barrierService) cache() barrierCacheReader { @@ -67,11 +94,17 @@ func (b *barrierService) cache() barrierCacheReader { return b.cacheProvider() } -// AwaitRevisionApplied blocks until the cache's max modRevision is >= req.MinRevision -// or the timeout elapses. In standalone mode there is one node, so Laggards carries a -// single entry whose current_mod_revision reports the cache watermark — this lets -// callers diagnose how far behind the standalone cache is even when applied=false. +// AwaitRevisionApplied blocks until every node in the watched set reports a +// max modRevision at or above req.MinRevision. When the cluster fan-out +// dependencies are wired (production), the call probes the frozen tier1 + +// tier2 + self watched set in parallel via NodeSchemaStatusService; without +// those dependencies (Phase 1 unit-test path), it falls back to a single +// in-process cache poll, returning a self-only laggard on timeout so callers +// can diagnose how far behind the standalone cache is. func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req *schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, error) { + if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil { + return b.awaitRevisionAppliedCluster(ctx, req) + } deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout())) pollCtx, cancel := context.WithDeadline(ctx, deadline) defer cancel() diff --git a/banyand/liaison/grpc/barrier_cluster.go b/banyand/liaison/grpc/barrier_cluster.go new file mode 100644 index 000000000..8cfab2938 --- /dev/null +++ b/banyand/liaison/grpc/barrier_cluster.go @@ -0,0 +1,254 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "context" + "errors" + "sync" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/queue" +) + +// memberRole identifies the role under which a watched-set member was discovered. +type memberRole int + +const ( + roleLiaison memberRole = iota + roleData +) + +func (r memberRole) String() string { + switch r { + case roleLiaison: + return "liaison" + case roleData: + return "data" + default: + return "unknown_role" + } +} + +// member is a single cluster member in the frozen watched set built at the +// start of an Await* call. +type member struct { + name string + role memberRole + isSelf bool +} + +// laggardName returns the addressable laggard identifier per the plan's +// `<role>-<Metadata.Name>` convention. +func (m member) laggardName() string { + return m.role.String() + "-" + m.name +} + +// snapshotMembers builds the frozen watched set from the receiving liaison's +// in-process self plus the tier1 (peer-liaison) and tier2 (data-node) Active +// route tables. Dedup is by Metadata.Name with the first-seen tier winning, +// so a hybrid host running both roles is probed exactly once: once via self +// if it is the receiving liaison, otherwise via tier1. +// +// Standalone fallback: when both tier route tables are empty (the local +// pipeline returns an empty RouteTable) the watched set degenerates to +// {self}, which the probe loop handles uniformly with the multi-member case. +func (b *barrierService) snapshotMembers() []member { + seen := map[string]struct{}{} + var watched []member + + if b.selfName != nil { + if name := b.selfName(); name != "" { + seen[name] = struct{}{} + watched = append(watched, member{name: name, role: roleLiaison, isSelf: true}) + } + } + + addFromTier := func(provider func() queue.Client, role memberRole) { + if provider == nil { + return + } + client := provider() + if client == nil { + return + } + rt := client.GetRouteTable() + if rt == nil { + return + } + for _, name := range rt.GetActive() { + if _, dup := seen[name]; dup { + continue + } + seen[name] = struct{}{} + watched = append(watched, member{name: name, role: role}) + } + } + addFromTier(b.peerLiaisons, roleLiaison) + addFromTier(b.dataNodes, roleData) + + return watched +} + +// probeResult is the outcome of a single per-iteration probe of one member. +// +// ready=true means the member is at or above the target revision (or returned +// codes.Unimplemented, which the cross-version policy treats as ready). When +// ready=false the (rev, err) pair carries whatever the probe last observed — +// rev=0 + err!=nil indicates a transient RPC failure that the next iteration +// retries; rev>0 + err==nil indicates the member is online but behind. +type probeResult struct { + err error + member member + rev int64 + ready bool +} + +// awaitRevisionAppliedCluster runs the cluster-wide fan-out for +// AwaitRevisionApplied. The receiving liaison's own cache is probed +// in-process; peers are probed via clusterv1.NodeSchemaStatusService over the +// *grpc.ClientConn borrowed from queue.Client. +func (b *barrierService) awaitRevisionAppliedCluster(ctx context.Context, req *schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, error) { + members := b.snapshotMembers() + if len(members) == 0 { + return nil, status.Errorf(codes.Unavailable, "no active cluster members") + } + + deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout())) + pollCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + interval := barrierInitInterval + var lastResults []probeResult + for { + lastResults = b.probeMembers(pollCtx, members, req.GetMinRevision(), deadline) + if allReady(lastResults) { + return &schemav1.AwaitRevisionAppliedResponse{Applied: true}, nil + } + if time.Now().After(deadline) { + return &schemav1.AwaitRevisionAppliedResponse{ + Applied: false, + Laggards: revisionLaggards(lastResults), + }, nil + } + select { + case <-time.After(interval): + case <-pollCtx.Done(): + return &schemav1.AwaitRevisionAppliedResponse{ + Applied: false, + Laggards: revisionLaggards(lastResults), + }, nil + } + interval = barrierBackoff(interval) + } +} + +// probeMembers runs one parallel iteration of GetMaxRevision probes against +// the watched set. Each per-member probe context inherits the call-wide +// deadline (shared, not divided across N members) so the loop's wall-clock is +// bounded by req.timeout regardless of fan-out width. +func (b *barrierService) probeMembers(ctx context.Context, members []member, minRev int64, deadline time.Time) []probeResult { + results := make([]probeResult, len(members)) + var wg sync.WaitGroup + probeCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + for i := range members { + wg.Add(1) + go func(idx int) { + defer wg.Done() + results[idx] = b.probeOne(probeCtx, members[idx], minRev) + }(i) + } + wg.Wait() + return results +} + +// probeOne returns the per-member outcome for one iteration. +func (b *barrierService) probeOne(ctx context.Context, m member, minRev int64) probeResult { + if m.isSelf { + c := b.cache() + if c == nil { + return probeResult{member: m} + } + rev := c.GetMaxModRevision() + return probeResult{member: m, rev: rev, ready: rev >= minRev} + } + + tier := b.peerLiaisons + if m.role == roleData { + tier = b.dataNodes + } + if tier == nil { + return probeResult{member: m, err: errors.New("no tier client wired")} + } + client := tier() + if client == nil { + return probeResult{member: m, err: errors.New("tier client unavailable")} + } + statusClient, err := client.NewNodeSchemaStatusClient(m.name) + if err != nil { + return probeResult{member: m, err: err} + } + resp, rpcErr := statusClient.GetMaxRevision(ctx, &clusterv1.GetMaxRevisionRequest{}) + if rpcErr != nil { + // Cross-version policy: a Phase-1 peer that does not implement + // NodeSchemaStatusService returns codes.Unimplemented; treat that + // member as ready (assume max_revision = ∞) so partial-upgrade + // clusters do not deadlock all barrier callers. + if status.Code(rpcErr) == codes.Unimplemented { + return probeResult{member: m, ready: true} + } + // Any other RPC error counts as a transient laggard for this + // iteration only — the next backoff iteration retries it. + return probeResult{member: m, err: rpcErr} + } + rev := resp.GetMaxModRevision() + return probeResult{member: m, rev: rev, ready: rev >= minRev} +} + +// allReady reports whether every member's most recent probe returned ready. +func allReady(results []probeResult) bool { + for _, r := range results { + if !r.ready { + return false + } + } + return true +} + +// revisionLaggards builds the laggards list for the timeout response, +// preserving the watched-set order. Cross-version-ready and ready-this-pass +// members are excluded so the list contains only the actual stragglers. +func revisionLaggards(results []probeResult) []*schemav1.NodeLaggard { + laggards := make([]*schemav1.NodeLaggard, 0) + for _, r := range results { + if r.ready { + continue + } + laggards = append(laggards, &schemav1.NodeLaggard{ + Node: r.member.laggardName(), + CurrentModRevision: r.rev, + }) + } + return laggards +} diff --git a/banyand/liaison/grpc/barrier_cluster_test.go b/banyand/liaison/grpc/barrier_cluster_test.go new file mode 100644 index 000000000..21a828d7c --- /dev/null +++ b/banyand/liaison/grpc/barrier_cluster_test.go @@ -0,0 +1,321 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "context" + "errors" + "sort" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + + clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/queue" +) + +// fakeNodeStatusClient is a minimal clusterv1.NodeSchemaStatusServiceClient. +// Only GetMaxRevision is implemented for Phase 2.2 tests; the other RPCs +// land in 2.3/2.4 and panic if a test accidentally invokes them. +type fakeNodeStatusClient struct { + err error + callsRef *int32 + maxRev int64 + delay time.Duration +} + +func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _ *clusterv1.GetMaxRevisionRequest, _ ...grpc.CallOption) (*clusterv1.GetMaxRevisionResponse, error) { + if f.callsRef != nil { + atomic.AddInt32(f.callsRef, 1) + } + if f.delay > 0 { + select { + case <-time.After(f.delay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + if f.err != nil { + return nil, f.err + } + return &clusterv1.GetMaxRevisionResponse{MaxModRevision: f.maxRev}, nil +} + +func (*fakeNodeStatusClient) GetKeyRevisions(_ context.Context, _ *clusterv1.GetKeyRevisionsRequest, _ ...grpc.CallOption) (*clusterv1.GetKeyRevisionsResponse, error) { + panic("GetKeyRevisions: unused in Phase 2.2 tests") +} + +func (*fakeNodeStatusClient) GetAbsentKeys(_ context.Context, _ *clusterv1.GetAbsentKeysRequest, _ ...grpc.CallOption) (*clusterv1.GetAbsentKeysResponse, error) { + panic("GetAbsentKeys: unused in Phase 2.2 tests") +} + +// fakeQueueClient embeds queue.Client (a nil interface) so unused methods +// panic at runtime; only the two methods the barrier fan-out actually calls +// are overridden. +type fakeQueueClient struct { + queue.Client + routeTable *databasev1.RouteTable + statusClients map[string]clusterv1.NodeSchemaStatusServiceClient +} + +func (f *fakeQueueClient) GetRouteTable() *databasev1.RouteTable { + if f.routeTable == nil { + return &databasev1.RouteTable{} + } + return f.routeTable +} + +func (f *fakeQueueClient) NewNodeSchemaStatusClient(node string) (clusterv1.NodeSchemaStatusServiceClient, error) { + c, ok := f.statusClients[node] + if !ok { + return nil, errors.New("no fake client for node " + node) + } + return c, nil +} + +func newFakeTier(active []string, clients map[string]clusterv1.NodeSchemaStatusServiceClient) *fakeQueueClient { + return &fakeQueueClient{ + routeTable: &databasev1.RouteTable{Active: active}, + statusClients: clients, + } +} + +// clusterFixture wires a barrier service for the cluster fan-out tests. +type clusterFixture struct { + cache barrierCacheReader + tier1, tier2 *fakeQueueClient + self string +} + +func (f *clusterFixture) build() *barrierService { + return newBarrierServiceCluster( + func() barrierCacheReader { return f.cache }, + func() queue.Client { return f.tier1 }, + func() queue.Client { return f.tier2 }, + func() string { return f.self }, + ) +} + +func laggardNames(laggards []*schemav1.NodeLaggard) []string { + out := make([]string, len(laggards)) + for i, l := range laggards { + out[i] = l.GetNode() + } + sort.Strings(out) + return out +} + +// TestFanOut_AllNodesReady_ReturnsApplied verifies that when every member +// (self + peer liaisons + data nodes) reports max_rev >= min_rev, the call +// returns Applied=true on the first iteration. +func TestFanOut_AllNodesReady_ReturnsApplied(t *testing.T) { + const target = int64(100) + cache := &staticBarrierCache{maxModRevision: 200} + tier1 := newFakeTier([]string{"liaison-1"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "liaison-1": &fakeNodeStatusClient{maxRev: 150}, + }) + tier2 := newFakeTier([]string{"data-1", "data-2"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "data-1": &fakeNodeStatusClient{maxRev: 110}, + "data-2": &fakeNodeStatusClient{maxRev: target}, + }) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: target, + Timeout: durationpb.New(50 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied()) + assert.Empty(t, resp.GetLaggards()) +} + +// TestFanOut_OneLaggard_ReportsOnTimeout verifies that when one peer remains +// behind the target on every iteration, the timeout response names exactly +// that laggard with its addressable role-prefixed identifier. +func TestFanOut_OneLaggard_ReportsOnTimeout(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + tier1 := newFakeTier(nil, nil) + tier2 := newFakeTier([]string{"data-1", "data-slow"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "data-1": &fakeNodeStatusClient{maxRev: 100}, + "data-slow": &fakeNodeStatusClient{maxRev: 42}, // never catches up + }) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(80 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + require.Len(t, resp.GetLaggards(), 1) + assert.Equal(t, "data-data-slow", resp.GetLaggards()[0].GetNode()) + assert.Equal(t, int64(42), resp.GetLaggards()[0].GetCurrentModRevision()) +} + +// TestFanOut_AllLaggards_ListsAll verifies that when every probed peer is +// behind the target, all of them appear in the laggards list with their +// role-prefixed names. +func TestFanOut_AllLaggards_ListsAll(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 0} // self also behind + tier1 := newFakeTier([]string{"peer-l"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "peer-l": &fakeNodeStatusClient{maxRev: 5}, + }) + tier2 := newFakeTier([]string{"peer-d"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "peer-d": &fakeNodeStatusClient{maxRev: 7}, + }) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(60 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + assert.Equal(t, []string{"data-peer-d", "liaison-peer-l", "liaison-self-liaison"}, + laggardNames(resp.GetLaggards())) +} + +// TestFanOut_NodeRPCErrors_CountedAsLaggard verifies that a non-Unimplemented +// gRPC error from a per-member probe makes that member a laggard for the +// iteration but does not fail the whole call. With a generous catch-up +// scenario the loop converges once the error clears. +func TestFanOut_NodeRPCErrors_CountedAsLaggard(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + errClient := &fakeNodeStatusClient{err: status.Errorf(codes.Internal, "boom")} + tier1 := newFakeTier(nil, nil) + tier2 := newFakeTier([]string{"sick"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "sick": errClient, + }) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: tier2, self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(60 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + require.Len(t, resp.GetLaggards(), 1) + assert.Equal(t, "data-sick", resp.GetLaggards()[0].GetNode()) +} + +// TestFanOut_EmptyActiveSet_ReturnsUnavailable verifies that when the snapshot +// finds zero members (no self, no peers, no data nodes) the call fails fast +// with codes.Unavailable rather than parking on an empty watched set. +func TestFanOut_EmptyActiveSet_ReturnsUnavailable(t *testing.T) { + svc := (&clusterFixture{ + cache: &staticBarrierCache{}, + tier1: newFakeTier(nil, nil), + tier2: newFakeTier(nil, nil), + self: "", // empty self → no member contributed + }).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 1, + Timeout: durationpb.New(20 * time.Millisecond), + }) + require.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, codes.Unavailable, status.Code(err)) +} + +// TestFanOut_ProbeIsShortUnary_NoServerWait verifies that a slow per-member +// probe (the fake delays past the call deadline) is treated as a transient +// laggard rather than as a successful long-poll. The whole call returns +// applied=false within the timeout instead of blocking on the slow probe. +func TestFanOut_ProbeIsShortUnary_NoServerWait(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + slow := &fakeNodeStatusClient{delay: 200 * time.Millisecond, maxRev: 100} + tier2 := newFakeTier([]string{"slow"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "slow": slow, + }) + svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), tier2: tier2, self: "self-liaison"}).build() + + start := time.Now() + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(60 * time.Millisecond), + }) + elapsed := time.Since(start) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + assert.Less(t, elapsed, 500*time.Millisecond, + "call must respect its own deadline rather than waiting for the slow per-probe response") + require.Len(t, resp.GetLaggards(), 1) + assert.Equal(t, "data-slow", resp.GetLaggards()[0].GetNode()) +} + +// TestFanOut_BackoffBounded verifies that when convergence requires multiple +// iterations the caller observes >= 2 probes per member but the per-iteration +// sleep stays bounded so the overall call respects its timeout. The fake +// peer counts how many times GetMaxRevision was called; the loop should make +// at least 2 probes inside an 80ms budget. +func TestFanOut_BackoffBounded(t *testing.T) { + var calls int32 + cache := &staticBarrierCache{maxModRevision: 100} + peer := &fakeNodeStatusClient{maxRev: 50, callsRef: &calls} // permanently behind + tier2 := newFakeTier([]string{"laggy"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "laggy": peer, + }) + svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), tier2: tier2, self: "self-liaison"}).build() + + start := time.Now() + _, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(80 * time.Millisecond), + }) + elapsed := time.Since(start) + require.NoError(t, err) + assert.GreaterOrEqual(t, atomic.LoadInt32(&calls), int32(2), + "backoff loop must make multiple probes within the timeout window") + // 80ms budget plus init/grow/cap sleeps; even the worst-case wall-clock + // stays well under 500ms = barrierMaxInterval. + assert.Less(t, elapsed, 500*time.Millisecond, + "timeout must not be exceeded by the backoff schedule") +} + +// TestFanOut_NodeReturnsUnimplemented_TreatedAsReady locks the cross-version +// policy: a peer (or data node) that returns codes.Unimplemented from +// NodeSchemaStatusService — i.e. a Phase-1 v0.11/v0.12 node — is treated as +// ready (assume max_revision = ∞) so partial-upgrade clusters do not deadlock +// barrier callers. +func TestFanOut_NodeReturnsUnimplemented_TreatedAsReady(t *testing.T) { + cache := &staticBarrierCache{maxModRevision: 100} + legacy := &fakeNodeStatusClient{err: status.Error(codes.Unimplemented, "phase 1 node")} + tier2 := newFakeTier([]string{"phase1"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "phase1": legacy, + }) + svc := (&clusterFixture{cache: cache, tier1: newFakeTier(nil, nil), tier2: tier2, self: "self-liaison"}).build() + + resp, err := svc.AwaitRevisionApplied(context.Background(), &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(50 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), + "Unimplemented from a Phase-1 peer must not block a v0.13 barrier caller") + assert.Empty(t, resp.GetLaggards()) +} diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index f71c8f9ca..cc96762bd 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -184,10 +184,10 @@ func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Clie propertyServer: propertyService, } - var barrierSVC *barrierService var nodeStatusSVC *property.NodeSchemaStatusServer + var cacheProvider func() barrierCacheReader if svc, svcOk := schemaRegistry.(metadata.Service); svcOk { - barrierSVC = newBarrierService(func() barrierCacheReader { + cacheProvider = func() barrierCacheReader { inner := svc.SchemaRegistry() if inner == nil { return nil @@ -197,7 +197,7 @@ func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Clie return nil } return bc - }) + } // Phase 2 Step 2.1: every cluster member with a schema cache exposes // NodeSchemaStatusService so peer liaisons (Step 2.2 fan-out) can // probe this liaison's cache identically to a data node. The receiving @@ -221,7 +221,6 @@ func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Clie traceSVC: traceSVC, bydbQLSVC: bydbQLSVC, groupRepo: gr, - barrierSVC: barrierSVC, nodeStatusSVC: nodeStatusSVC, streamRegistryServer: &streamRegistryServer{ schemaRegistry: schemaRegistry, @@ -253,6 +252,19 @@ func NewServer(_ context.Context, tir1Client, tir2Client, broadcaster queue.Clie protector: protectorService, routeTableProviders: routeProviders, } + // Phase 2 Step 2.2: wire the cluster-wide AwaitRevisionApplied fan-out. + // Tier1 (peer liaisons) and tier2 (data nodes) connection pools are + // borrowed via the queue.Client interface added in #1109; the receiving + // liaison's name is read from s.curNode, which initCurrentNode populates + // during PreRun — hence the closure indirection. + if cacheProvider != nil { + s.barrierSVC = newBarrierServiceCluster( + cacheProvider, + func() queue.Client { return tir1Client }, + func() queue.Client { return tir2Client }, + func() string { return s.curNode.GetMetadata().GetName() }, + ) + } s.accessLogRecorders = []accessLogRecorder{streamSVC, measureSVC, traceSVC, s.propertyServer} s.queryAccessLogRecorders = []queryAccessLogRecorder{streamSVC, measureSVC, traceSVC, s.propertyServer}
