This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e9e1afcc9d43aa42d0f69080c0ed8faf8e6d46fa Author: Hongtao Gao <[email protected]> AuthorDate: Mon May 4 06:28:06 2026 +0000 feat(barrier): cluster fan-out for AwaitSchemaApplied (Phase 2 §FA-1, §FA-2) Phase 2 Step 2.3 — extend the cluster barrier from #1111 (Step 2.2) to the per-key path. AwaitSchemaApplied now probes a frozen tier1 + tier2 + self watched set in parallel via GetKeyRevisions; the receiving liaison's own cache is read in-process. Snapshot semantics — eviction / leave / late-join — match the AwaitRevisionApplied behavior added in §SS-1..SS-4. Algorithm: - Per-iteration loop: re-snapshot tier1 + tier2 route tables, apply membership transitions (applyTransitionsApplied), probe alive set, decide. Eviction laggards for the per-key path carry only Node + Reason ("evicted_during_poll"); the cluster has already removed the member, so its outstanding key state is no longer something the call is waiting on. - Per-member probe: chunk keys at barrierKeyChunkSize=1000 per RPC; each per-chunk context inherits time.Until(deadline) (shared, NOT divided across chunks or members). Total wall-clock stays bounded by req.Timeout regardless of fan-out width or chunk count. - Self path: in-process scan via barrierCacheReader, mirroring Phase 1's collectMissingKeys logic. - Cross-version policy: a peer returning codes.Unimplemented from GetKeyRevisions (Phase-1 v0.11/v0.12 node) is treated as ready (assume every key applied) so partial-upgrade clusters do not deadlock barrier callers. Other gRPC errors → transient laggard for this iteration only. - Backoff cadence is unchanged: 10ms init × 1.5 cap 500ms; default 5s timeout. Dispatch: AwaitSchemaApplied checks peerLiaisons / dataNodes / selfName at the top of the call and routes to awaitSchemaAppliedCluster when all three are wired (production); falls back to the legacy in-process loop otherwise (Phase 1 unit-test path). Tests (barrier_cluster_test.go): - fakeNodeStatusClient gains a real GetKeyRevisions impl backed by a per-key state map (fakeKeyState{rev, present}); per-call counters and delays let tests verify chunking + shared-deadline behavior. - TestAwaitSchemaApplied_FanOut_PerKeyLaggards: timeout response carries per-node missing_keys with role-prefixed identifiers. - TestAwaitSchemaApplied_FanOut_ChunkedKeys: a 1500-key request triggers ≥ 2 GetKeyRevisions calls per peer (1000 + 500) per iteration; the per-peer aggregated missing_keys covers all 1500 across chunks. - TestAwaitSchemaApplied_FanOut_SharedDeadline: regression — with each chunk delay 100ms and req.Timeout 50ms, total wall-clock stays under 180ms (NOT 2 × 100ms = 200ms, which is what equal-slice timeout division would produce). - TestAwaitSchemaApplied_FanOut_PeerUnimplemented_TreatedAsReady: Unimplemented from a Phase-1 peer must not block AwaitSchemaApplied. All 19 cluster fan-out tests pass; existing barrier_test.go tests (Phase 1 single-cache path + collectMissingKeys / collectPresentKeys helpers) stay green; lint clean. via [HAPI](https://hapi.run) --- banyand/liaison/grpc/barrier.go | 8 +- banyand/liaison/grpc/barrier_cluster.go | 228 +++++++++++++++++++++++ banyand/liaison/grpc/barrier_cluster_test.go | 260 +++++++++++++++++++++++++-- 3 files changed, 483 insertions(+), 13 deletions(-) diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go index 098301935..ec152b8ec 100644 --- a/banyand/liaison/grpc/barrier.go +++ b/banyand/liaison/grpc/barrier.go @@ -138,8 +138,14 @@ func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req *schemav1 } // AwaitSchemaApplied blocks until all requested keys are present at or above their -// per-key min_revisions, or the timeout elapses. +// per-key min_revisions, or the timeout elapses. When the cluster fan-out +// dependencies are wired (production), the call probes the frozen tier1 + +// tier2 + self watched set in parallel via GetKeyRevisions; without them +// (Phase 1 unit-test path), it falls back to a single in-process cache poll. func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req *schemav1.AwaitSchemaAppliedRequest) (*schemav1.AwaitSchemaAppliedResponse, error) { + if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil { + return b.awaitSchemaAppliedCluster(ctx, req) + } if len(req.GetKeys()) > barrierMaxKeys { return nil, status.Errorf(codes.InvalidArgument, "too many keys: max=%d", barrierMaxKeys) } diff --git a/banyand/liaison/grpc/barrier_cluster.go b/banyand/liaison/grpc/barrier_cluster.go index 3cbbae78e..8b14410d1 100644 --- a/banyand/liaison/grpc/barrier_cluster.go +++ b/banyand/liaison/grpc/barrier_cluster.go @@ -329,6 +329,234 @@ func (b *barrierService) probeOne(ctx context.Context, m member, minRev int64) p return probeResult{member: m, rev: rev, ready: rev >= minRev} } +// keyProbeResult is the per-iteration outcome of a GetKeyRevisions probe for +// one member. missingKeys carries only the keys that failed the apply check +// (absent or below their target revision). ready=true when missingKeys is +// empty after the probe; err!=nil means the probe is a transient laggard for +// this iteration only. +type keyProbeResult struct { + err error + missingKeys []*schemav1.SchemaKey + member member + ready bool +} + +// barrierKeyChunkSize matches the plan §2.3 chunking policy: each peer probe +// for AwaitSchemaApplied / AwaitSchemaDeleted issues at most 1000 keys per +// RPC. The server-side cap (nodeStatusMaxKeys = 10000) is the absolute limit; +// 1000 is a soft chunking threshold that keeps individual RPC payloads well +// under typical gRPC max-message-size defaults and fans the work out across +// multiple round-trips for large requests. +const barrierKeyChunkSize = 1000 + +// awaitSchemaAppliedCluster runs the cluster-wide fan-out for +// AwaitSchemaApplied. Self is probed in-process via the cache; peers are +// probed via GetKeyRevisions over the *grpc.ClientConn borrowed from +// queue.Client. Per-member probes chunk keys at barrierKeyChunkSize per RPC, +// each chunk inheriting the call-wide deadline (shared, not divided across +// chunks or members). Frozen-snapshot semantics — eviction / leave / late +// joiner exclusion — match awaitRevisionAppliedCluster's behavior. +func (b *barrierService) awaitSchemaAppliedCluster(ctx context.Context, req *schemav1.AwaitSchemaAppliedRequest) (*schemav1.AwaitSchemaAppliedResponse, error) { + if len(req.GetKeys()) > barrierMaxKeys { + return nil, status.Errorf(codes.InvalidArgument, "too many keys: max=%d", barrierMaxKeys) + } + frozen := b.snapshotMembers() + if len(frozen) == 0 { + return nil, status.Errorf(codes.Unavailable, "no active cluster members") + } + + deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout())) + pollCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + alive := frozen + var evictedLaggards []*schemav1.NodeLaggard + interval := barrierInitInterval + var lastResults []keyProbeResult + for { + view := b.currentMembership() + var newlyEvicted []*schemav1.NodeLaggard + alive, newlyEvicted = applyTransitionsApplied(alive, view) + evictedLaggards = append(evictedLaggards, newlyEvicted...) + + if len(alive) == 0 { + return &schemav1.AwaitSchemaAppliedResponse{Applied: true, Laggards: evictedLaggards}, nil + } + + lastResults = b.probeKeyRevisions(pollCtx, alive, req.GetKeys(), req.GetMinRevisions(), deadline) + if allKeysApplied(lastResults) { + return &schemav1.AwaitSchemaAppliedResponse{Applied: true, Laggards: evictedLaggards}, nil + } + if time.Now().After(deadline) { + return &schemav1.AwaitSchemaAppliedResponse{ + Applied: false, + Laggards: append(keyLaggards(lastResults), evictedLaggards...), + }, nil + } + select { + case <-time.After(interval): + case <-pollCtx.Done(): + return &schemav1.AwaitSchemaAppliedResponse{ + Applied: false, + Laggards: append(keyLaggards(lastResults), evictedLaggards...), + }, nil + } + interval = barrierBackoff(interval) + } +} + +// applyTransitionsApplied is the AwaitSchemaApplied / AwaitSchemaDeleted +// sibling of applyTransitions. Eviction laggards for the per-key barriers +// carry only the role-prefixed name and Reason; missing_keys / +// still_present_keys are intentionally empty because the cluster has +// already removed the member from the watched set, so its outstanding key +// state is no longer something the call is waiting on. +func applyTransitionsApplied(alive []member, view membershipView) (kept []member, newlyEvicted []*schemav1.NodeLaggard) { + for _, m := range alive { + if m.isSelf { + kept = append(kept, m) + continue + } + if _, evicted := view.evictable[m.name]; evicted { + newlyEvicted = append(newlyEvicted, &schemav1.NodeLaggard{ + Node: m.laggardName(), + Reason: evictedLaggardReason, + }) + continue + } + if _, active := view.active[m.name]; !active { + continue + } + kept = append(kept, m) + } + return kept, newlyEvicted +} + +// probeKeyRevisions runs one parallel iteration of GetKeyRevisions probes +// against the watched set. Each per-member probe chunks the key list into +// blocks of barrierKeyChunkSize and aggregates per-chunk responses into a +// single missingKeys slice; the call-wide deadline is shared across all +// chunks of all members. +func (b *barrierService) probeKeyRevisions(ctx context.Context, members []member, keys []*schemav1.SchemaKey, minRevs []int64, deadline time.Time) []keyProbeResult { + results := make([]keyProbeResult, len(members)) + var wg sync.WaitGroup + probeCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + for i := range members { + wg.Add(1) + go func(idx int) { + defer wg.Done() + results[idx] = b.probeKeysOne(probeCtx, members[idx], keys, minRevs) + }(i) + } + wg.Wait() + return results +} + +// probeKeysOne returns the per-member outcome for one iteration of +// AwaitSchemaApplied's per-key probe. Self uses an in-process cache scan; +// peers use GetKeyRevisions chunked at barrierKeyChunkSize. +func (b *barrierService) probeKeysOne(ctx context.Context, m member, keys []*schemav1.SchemaKey, minRevs []int64) keyProbeResult { + if m.isSelf { + c := b.cache() + if c == nil { + // Fail-closed: nil cache → every key is missing so the loop + // keeps polling until the cache is online. + missing := make([]*schemav1.SchemaKey, len(keys)) + copy(missing, keys) + return keyProbeResult{member: m, missingKeys: missing} + } + var missing []*schemav1.SchemaKey + for idx, key := range keys { + propID := schemaKeyToPropID(key) + rev, ok := c.GetKeyModRevision(propID) + var minRev int64 + if idx < len(minRevs) { + minRev = minRevs[idx] + } + if !ok || (minRev > 0 && rev < minRev) { + missing = append(missing, key) + } + } + return keyProbeResult{member: m, missingKeys: missing, ready: len(missing) == 0} + } + + tier := b.peerLiaisons + if m.role == roleData { + tier = b.dataNodes + } + if tier == nil { + return keyProbeResult{member: m, err: errors.New("no tier client wired")} + } + client := tier() + if client == nil { + return keyProbeResult{member: m, err: errors.New("tier client unavailable")} + } + statusClient, err := client.NewNodeSchemaStatusClient(m.name) + if err != nil { + return keyProbeResult{member: m, err: err} + } + + var missing []*schemav1.SchemaKey + for chunkStart := 0; chunkStart < len(keys); chunkStart += barrierKeyChunkSize { + chunkEnd := chunkStart + barrierKeyChunkSize + if chunkEnd > len(keys) { + chunkEnd = len(keys) + } + chunkKeys := keys[chunkStart:chunkEnd] + resp, rpcErr := statusClient.GetKeyRevisions(ctx, &clusterv1.GetKeyRevisionsRequest{Keys: chunkKeys}) + if rpcErr != nil { + // Cross-version: a Phase-1 peer answers Unimplemented; treat + // that member as ready (assume every key applied). + if status.Code(rpcErr) == codes.Unimplemented { + return keyProbeResult{member: m, ready: true} + } + // Other RPC errors are transient laggards for this iteration. + return keyProbeResult{member: m, err: rpcErr} + } + revisions := resp.GetRevisions() + for i, kr := range revisions { + globalIdx := chunkStart + i + var minRev int64 + if globalIdx < len(minRevs) { + minRev = minRevs[globalIdx] + } + if !kr.GetPresent() || (minRev > 0 && kr.GetModRevision() < minRev) { + missing = append(missing, chunkKeys[i]) + } + } + } + return keyProbeResult{member: m, missingKeys: missing, ready: len(missing) == 0} +} + +// allKeysApplied reports whether every member's most recent probe found +// every key applied at or above its target revision. +func allKeysApplied(results []keyProbeResult) bool { + for _, r := range results { + if !r.ready { + return false + } + } + return true +} + +// keyLaggards builds the laggards list for the timeout response. Members +// that succeeded this iteration are excluded so the list contains only +// the actual stragglers (each carrying their per-node missing_keys). +func keyLaggards(results []keyProbeResult) []*schemav1.NodeLaggard { + laggards := make([]*schemav1.NodeLaggard, 0) + for _, r := range results { + if r.ready { + continue + } + laggards = append(laggards, &schemav1.NodeLaggard{ + Node: r.member.laggardName(), + MissingKeys: r.missingKeys, + }) + } + return laggards +} + // allReady reports whether every member's most recent probe returned ready. func allReady(results []probeResult) bool { for _, r := range results { diff --git a/banyand/liaison/grpc/barrier_cluster_test.go b/banyand/liaison/grpc/barrier_cluster_test.go index adcad0921..2921e6156 100644 --- a/banyand/liaison/grpc/barrier_cluster_test.go +++ b/banyand/liaison/grpc/barrier_cluster_test.go @@ -21,6 +21,7 @@ import ( "context" "errors" "sort" + "strconv" "sync/atomic" "testing" "time" @@ -38,18 +39,27 @@ import ( "github.com/apache/skywalking-banyandb/banyand/queue" ) +// fakeKeyState backs the GetKeyRevisions fake response for a single key. +// Keys absent from the keyState map default to Present=false (the natural +// "node hasn't seen this key" semantic). +type fakeKeyState struct { + rev int64 + present bool +} + // fakeNodeStatusClient is a minimal clusterv1.NodeSchemaStatusServiceClient. -// Only GetMaxRevision is implemented for Phase 2.2 tests; the other RPCs -// land in 2.3/2.4 and panic if a test accidentally invokes them. revs lets -// frozen-snapshot tests vary the returned revision per call (call k returns -// revs[min(k, len-1)]) so the barrier can observe a member catching up -// across iterations. +// GetMaxRevision and GetKeyRevisions are implemented; GetAbsentKeys remains +// a panic stub until Phase 2.4 (FD-1/FD-2). Per-call counters and delays let +// tests verify chunking + shared-deadline regression behavior. type fakeNodeStatusClient struct { - err error - callsRef *int32 - revs []int64 - maxRev int64 - delay time.Duration + err error + callsRef *int32 + keyState map[string]fakeKeyState + keyRevsCalls *int32 + revs []int64 + maxRev int64 + delay time.Duration + keyRevsDelay time.Duration } func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _ *clusterv1.GetMaxRevisionRequest, _ ...grpc.CallOption) (*clusterv1.GetMaxRevisionResponse, error) { @@ -77,8 +87,43 @@ func (f *fakeNodeStatusClient) GetMaxRevision(ctx context.Context, _ *clusterv1. return &clusterv1.GetMaxRevisionResponse{MaxModRevision: f.maxRev}, nil } -func (*fakeNodeStatusClient) GetKeyRevisions(_ context.Context, _ *clusterv1.GetKeyRevisionsRequest, _ ...grpc.CallOption) (*clusterv1.GetKeyRevisionsResponse, error) { - panic("GetKeyRevisions: unused in Phase 2.2 tests") +func (f *fakeNodeStatusClient) GetKeyRevisions( + ctx context.Context, req *clusterv1.GetKeyRevisionsRequest, _ ...grpc.CallOption, +) (*clusterv1.GetKeyRevisionsResponse, error) { + if f.keyRevsCalls != nil { + atomic.AddInt32(f.keyRevsCalls, 1) + } + if f.keyRevsDelay > 0 { + select { + case <-time.After(f.keyRevsDelay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + if f.err != nil { + return nil, f.err + } + keys := req.GetKeys() + revs := make([]*clusterv1.KeyRevision, len(keys)) + for i, k := range keys { + mapKey := k.GetKind() + "|" + k.GetGroup() + "|" + k.GetName() + if state, ok := f.keyState[mapKey]; ok { + revs[i] = &clusterv1.KeyRevision{Key: k, Present: state.present, ModRevision: state.rev} + continue + } + revs[i] = &clusterv1.KeyRevision{Key: k} + } + return &clusterv1.GetKeyRevisionsResponse{Revisions: revs}, nil +} + +// keyStateKey produces the deterministic map key used by fakeNodeStatusClient +// to look up per-(kind,group,name) state. Tests currently exercise only the +// "stream" kind; extend the signature when other kinds need direct state +// injection. +// +//nolint:unparam // kind/group/name kept symmetric with SchemaKey for clarity. +func keyStateKey(kind, group, name string) string { + return kind + "|" + group + "|" + name } func (*fakeNodeStatusClient) GetAbsentKeys(_ context.Context, _ *clusterv1.GetAbsentKeysRequest, _ ...grpc.CallOption) (*clusterv1.GetAbsentKeysResponse, error) { @@ -451,3 +496,194 @@ func TestFanOut_LateJoiner_Excluded(t *testing.T) { assert.Empty(t, resp.GetLaggards(), "laggards list must not contain the late joiner peer-B") } + +// presentAt builds a fakeKeyState shorthand for "key is present on this +// member at the given revision." +// +//nolint:unparam // tests currently target rev=100 only; the parameter is kept for clarity. +func presentAt(rev int64) fakeKeyState { return fakeKeyState{rev: rev, present: true} } + +// streamKey is a SchemaKey constructor for the cluster fan-out tests. +// +//nolint:unparam // tests currently use a single group "g"; group remains in the signature for readability. +func streamKey(group, name string) *schemav1.SchemaKey { + return &schemav1.SchemaKey{Kind: "stream", Group: group, Name: name} +} + +// laggardByNode returns the NodeLaggard whose Node field matches `name`, or +// nil if not found. Used to assert per-node laggard payloads without +// depending on slice ordering. +func laggardByNode(laggards []*schemav1.NodeLaggard, name string) *schemav1.NodeLaggard { + for _, l := range laggards { + if l.GetNode() == name { + return l + } + } + return nil +} + +// TestAwaitSchemaApplied_FanOut_PerKeyLaggards verifies that the timeout +// response carries per-node missing_keys with role-prefixed identifiers, so +// the caller can see exactly which keys are outstanding on which member. +func TestAwaitSchemaApplied_FanOut_PerKeyLaggards(t *testing.T) { + keys := []*schemav1.SchemaKey{ + streamKey("g", "k0"), + streamKey("g", "k1"), + streamKey("g", "k2"), + streamKey("g", "k3"), + streamKey("g", "k4"), + } + cache := &staticBarrierCache{ + maxModRevision: 100, + keys: map[string]int64{ + "stream_g/k0": 100, "stream_g/k1": 100, + "stream_g/k2": 100, "stream_g/k3": 100, "stream_g/k4": 100, + }, + } + peerA := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{ + keyStateKey("stream", "g", "k0"): presentAt(100), + keyStateKey("stream", "g", "k1"): presentAt(100), + }} // missing: k2, k3, k4 + peerB := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{ + keyStateKey("stream", "g", "k0"): presentAt(100), + keyStateKey("stream", "g", "k1"): presentAt(100), + keyStateKey("stream", "g", "k2"): presentAt(100), + keyStateKey("stream", "g", "k3"): presentAt(100), + }} // missing: k4 + tier1 := newFakeTier([]string{"peer-A", "peer-B"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "peer-A": peerA, + "peer-B": peerB, + }) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitSchemaApplied(context.Background(), &schemav1.AwaitSchemaAppliedRequest{ + Keys: keys, + Timeout: durationpb.New(60 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + require.Len(t, resp.GetLaggards(), 2) + + la := laggardByNode(resp.GetLaggards(), "liaison-peer-A") + require.NotNil(t, la, "laggard for peer-A must be present") + assert.Equal(t, []string{"k2", "k3", "k4"}, schemaKeyNames(la.GetMissingKeys())) + + lb := laggardByNode(resp.GetLaggards(), "liaison-peer-B") + require.NotNil(t, lb, "laggard for peer-B must be present") + assert.Equal(t, []string{"k4"}, schemaKeyNames(lb.GetMissingKeys())) +} + +// schemaKeyNames extracts the Name field from each SchemaKey for assertion +// readability. +func schemaKeyNames(keys []*schemav1.SchemaKey) []string { + out := make([]string, len(keys)) + for i, k := range keys { + out[i] = k.GetName() + } + sort.Strings(out) + return out +} + +// TestAwaitSchemaApplied_FanOut_ChunkedKeys verifies that a request whose +// key count exceeds barrierKeyChunkSize (1000) triggers multiple per-peer +// RPCs, with the per-chunk responses correctly aggregated into a single +// missing_keys slice for that peer. +func TestAwaitSchemaApplied_FanOut_ChunkedKeys(t *testing.T) { + const total = 1500 + keys := make([]*schemav1.SchemaKey, total) + cacheKeys := make(map[string]int64, total) + for i := range total { + name := "k" + strconv.Itoa(i) + keys[i] = streamKey("g", name) + cacheKeys["stream_g/"+name] = 100 // self has them all + } + cache := &staticBarrierCache{maxModRevision: 100, keys: cacheKeys} + + var peerCalls int32 + peer := &fakeNodeStatusClient{ + keyState: map[string]fakeKeyState{}, // peer has nothing → every key missing + keyRevsCalls: &peerCalls, + } + tier1 := newFakeTier([]string{"peer"}, map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer}) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitSchemaApplied(context.Background(), &schemav1.AwaitSchemaAppliedRequest{ + Keys: keys, + Timeout: durationpb.New(40 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + + // Expect 2 chunks (1000 + 500) per iteration. The loop may run multiple + // iterations within the timeout; assert at least one full pass happened. + assert.GreaterOrEqual(t, atomic.LoadInt32(&peerCalls), int32(2), + "a 1500-key request must produce >= 2 GetKeyRevisions calls per peer (chunked at 1000)") + assert.Equal(t, int32(0), atomic.LoadInt32(&peerCalls)%2, + "chunks per iteration is 2 (1000 + 500); call count should be a multiple of 2") + + // All 1500 keys missing from peer; aggregated across both chunks. + la := laggardByNode(resp.GetLaggards(), "liaison-peer") + require.NotNil(t, la) + assert.Len(t, la.GetMissingKeys(), total, + "per-peer missing_keys aggregates both chunks") +} + +// TestAwaitSchemaApplied_FanOut_SharedDeadline regresses the plan §2.3 +// contract that each per-node, per-chunk RPC inherits time.Until(deadline) +// rather than req.Timeout / N. With per-chunk delays larger than the call's +// total budget, the call must complete around req.Timeout — NOT N × delay. +func TestAwaitSchemaApplied_FanOut_SharedDeadline(t *testing.T) { + const total = 1500 + keys := make([]*schemav1.SchemaKey, total) + for i := range total { + keys[i] = streamKey("g", "k"+strconv.Itoa(i)) + } + cache := &staticBarrierCache{maxModRevision: 100} // no keys → self also missing + + peer := &fakeNodeStatusClient{ + keyState: map[string]fakeKeyState{}, + keyRevsDelay: 100 * time.Millisecond, // each chunk is slow + } + tier1 := newFakeTier([]string{"peer"}, map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer}) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + start := time.Now() + resp, err := svc.AwaitSchemaApplied(context.Background(), &schemav1.AwaitSchemaAppliedRequest{ + Keys: keys, + Timeout: durationpb.New(50 * time.Millisecond), + }) + elapsed := time.Since(start) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + // Without shared deadline the call would take 2 × 100ms = 200ms. + // With shared deadline the entire call respects req.Timeout (50ms) + // regardless of how many chunks would otherwise be issued. + assert.Less(t, elapsed, 180*time.Millisecond, + "shared deadline must bound total wall-clock at req.Timeout, not N × per-chunk delay") +} + +// TestAwaitSchemaApplied_FanOut_PeerUnimplemented_TreatedAsReady locks the +// cross-version policy: a peer (or data node) that returns +// codes.Unimplemented from GetKeyRevisions — i.e. a Phase-1 v0.11 / v0.12 +// node — is treated as ready (assume every key applied) so partial-upgrade +// clusters do not deadlock barrier callers. +func TestAwaitSchemaApplied_FanOut_PeerUnimplemented_TreatedAsReady(t *testing.T) { + keys := []*schemav1.SchemaKey{streamKey("g", "k0"), streamKey("g", "k1")} + cache := &staticBarrierCache{maxModRevision: 100, keys: map[string]int64{ + "stream_g/k0": 100, "stream_g/k1": 100, + }} + legacy := &fakeNodeStatusClient{err: status.Error(codes.Unimplemented, "phase 1 node")} + tier1 := newFakeTier([]string{"phase1"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "phase1": legacy, + }) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitSchemaApplied(context.Background(), &schemav1.AwaitSchemaAppliedRequest{ + Keys: keys, + Timeout: durationpb.New(50 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), + "Unimplemented from a Phase-1 peer must not block AwaitSchemaApplied") + assert.Empty(t, resp.GetLaggards()) +}
