This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 6731df5c0bb16b4e19f32a64eedd8727d0daf328 Author: Hongtao Gao <[email protected]> AuthorDate: Mon May 4 06:32:38 2026 +0000 feat(barrier): cluster fan-out for AwaitSchemaDeleted (Phase 2 §FD-1, §FD-2) Phase 2 Step 2.4 — extend the cluster barrier to the deletion path. AwaitSchemaDeleted now probes the frozen tier1 + tier2 + self watched set in parallel via GetAbsentKeys; the receiving liaison's own cache is read in-process. The §5.0 differences from AwaitSchemaApplied: per-member probe RPC (GetAbsentKeys instead of GetKeyRevisions) and decision rule ("every key absent on every member" instead of "present at or above target rev"). Membership snapshot, chunking, shared deadline, cross-version policy, and transient-error handling are identical and reuse the helpers introduced in §FA-1. Algorithm: - Per-iteration loop and applyTransitionsApplied are reused (no duplication of eviction / leave / late-join logic). - Per-member probe (probeAbsentOne): chunks keys at barrierKeyChunkSize=1000 per RPC, aggregates StillPresentKeys across chunks. Self uses an in-process scan that mirrors Phase 1's collectPresentKeys. - Cross-version: codes.Unimplemented from a peer ⇒ ready (assume every key absent). Other gRPC errors ⇒ transient laggard for this iteration only. - Nil-cache fail-closed: self with nil cache reports every key still present so the loop keeps polling, mirroring the Phase 1 standalone contract. Dispatch: AwaitSchemaDeleted now checks peerLiaisons / dataNodes / selfName and routes to awaitSchemaDeletedCluster when wired (production); falls back to the legacy in-process loop otherwise. Tests (barrier_cluster_test.go): - fakeNodeStatusClient.GetAbsentKeys is now a real impl (the Phase 2.2 panic stub is gone). Reuses fakeKeyState; absent partition is "key in state with present=true" → still_present, else absent. - A new keyStateFn closure on the fake lets tests vary state per call (mirroring the mutatingRouteTable pattern from §SS-2..§SS-4) — needed for the SucceedsAfterAllNodesDrain regression. - TestAwaitSchemaDeleted_FanOut_ReportsStillPresent: per-node still_present laggards with role-prefixed identifiers. - TestAwaitSchemaDeleted_FanOut_SucceedsAfterAllNodesDrain: peer transitions from "both keys present" on call 1 to "both absent" on call 2; barrier converges with applied=true and zero laggards once both nodes drain. - TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey: after delete + recreate at higher rev, every member's GetAbsentKeys reports the key still_present (the recreated row is alive); barrier returns applied=false with the key surfaced as still_present per member. All 22 cluster fan-out tests pass; existing barrier_test.go (Phase 1 helpers + standalone path) stays green; lint clean. via [HAPI](https://hapi.run) --- banyand/liaison/grpc/barrier.go | 10 +- banyand/liaison/grpc/barrier_cluster.go | 172 +++++++++++++++++++++++++++ banyand/liaison/grpc/barrier_cluster_test.go | 146 ++++++++++++++++++++++- 3 files changed, 324 insertions(+), 4 deletions(-) diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go index ec152b8ec..c4a606736 100644 --- a/banyand/liaison/grpc/barrier.go +++ b/banyand/liaison/grpc/barrier.go @@ -176,9 +176,15 @@ func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req *schemav1.A } } -// AwaitSchemaDeleted blocks until all requested keys are absent from the cache, -// or the timeout elapses. +// AwaitSchemaDeleted blocks until all requested keys are absent from the +// cache, or the timeout elapses. When the cluster fan-out dependencies are +// wired (production), the call probes the frozen tier1 + tier2 + self +// watched set in parallel via GetAbsentKeys; without them (Phase 1 +// unit-test path), it falls back to a single in-process cache poll. func (b *barrierService) AwaitSchemaDeleted(ctx context.Context, req *schemav1.AwaitSchemaDeletedRequest) (*schemav1.AwaitSchemaDeletedResponse, error) { + if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil { + return b.awaitSchemaDeletedCluster(ctx, req) + } if len(req.GetKeys()) > barrierMaxKeys { return nil, status.Errorf(codes.InvalidArgument, "too many keys: max=%d", barrierMaxKeys) } diff --git a/banyand/liaison/grpc/barrier_cluster.go b/banyand/liaison/grpc/barrier_cluster.go index 8b14410d1..6370fbc92 100644 --- a/banyand/liaison/grpc/barrier_cluster.go +++ b/banyand/liaison/grpc/barrier_cluster.go @@ -529,6 +529,178 @@ func (b *barrierService) probeKeysOne(ctx context.Context, m member, keys []*sch return keyProbeResult{member: m, missingKeys: missing, ready: len(missing) == 0} } +// stillPresentResult is the per-iteration outcome of a GetAbsentKeys probe +// for one member during AwaitSchemaDeleted. stillPresent carries the keys +// the member's cache has not yet removed; ready=true when every requested +// key is absent (stillPresent is empty). +type stillPresentResult struct { + err error + stillPresent []*schemav1.SchemaKey + member member + ready bool +} + +// awaitSchemaDeletedCluster runs the cluster-wide fan-out for +// AwaitSchemaDeleted. Per the §5.0 differences table the only deltas from +// AwaitSchemaApplied are the per-member probe RPC (GetAbsentKeys instead of +// GetKeyRevisions) and the decision rule (`every key absent on every member` +// instead of `every key present at or above target rev`). Membership +// snapshot, chunking, shared deadline, cross-version, and transient-error +// policies are identical. +func (b *barrierService) awaitSchemaDeletedCluster(ctx context.Context, req *schemav1.AwaitSchemaDeletedRequest) (*schemav1.AwaitSchemaDeletedResponse, error) { + if len(req.GetKeys()) > barrierMaxKeys { + return nil, status.Errorf(codes.InvalidArgument, "too many keys: max=%d", barrierMaxKeys) + } + frozen := b.snapshotMembers() + if len(frozen) == 0 { + return nil, status.Errorf(codes.Unavailable, "no active cluster members") + } + + deadline := time.Now().Add(barrierDeadlineDuration(req.GetTimeout())) + pollCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + alive := frozen + var evictedLaggards []*schemav1.NodeLaggard + interval := barrierInitInterval + var lastResults []stillPresentResult + for { + view := b.currentMembership() + var newlyEvicted []*schemav1.NodeLaggard + alive, newlyEvicted = applyTransitionsApplied(alive, view) + evictedLaggards = append(evictedLaggards, newlyEvicted...) + + if len(alive) == 0 { + return &schemav1.AwaitSchemaDeletedResponse{Applied: true, Laggards: evictedLaggards}, nil + } + + lastResults = b.probeAbsentKeys(pollCtx, alive, req.GetKeys(), deadline) + if allKeysAbsent(lastResults) { + return &schemav1.AwaitSchemaDeletedResponse{Applied: true, Laggards: evictedLaggards}, nil + } + if time.Now().After(deadline) { + return &schemav1.AwaitSchemaDeletedResponse{ + Applied: false, + Laggards: append(stillPresentLaggards(lastResults), evictedLaggards...), + }, nil + } + select { + case <-time.After(interval): + case <-pollCtx.Done(): + return &schemav1.AwaitSchemaDeletedResponse{ + Applied: false, + Laggards: append(stillPresentLaggards(lastResults), evictedLaggards...), + }, nil + } + interval = barrierBackoff(interval) + } +} + +// probeAbsentKeys runs one parallel iteration of GetAbsentKeys probes +// against the watched set, chunking keys at barrierKeyChunkSize with a +// shared call-wide deadline. +func (b *barrierService) probeAbsentKeys(ctx context.Context, members []member, keys []*schemav1.SchemaKey, deadline time.Time) []stillPresentResult { + results := make([]stillPresentResult, len(members)) + var wg sync.WaitGroup + probeCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + for i := range members { + wg.Add(1) + go func(idx int) { + defer wg.Done() + results[idx] = b.probeAbsentOne(probeCtx, members[idx], keys) + }(i) + } + wg.Wait() + return results +} + +// probeAbsentOne returns the per-member outcome for one iteration of the +// AwaitSchemaDeleted probe. +func (b *barrierService) probeAbsentOne(ctx context.Context, m member, keys []*schemav1.SchemaKey) stillPresentResult { + if m.isSelf { + c := b.cache() + if c == nil { + // Fail-closed: nil cache → claim every key still present so + // the loop keeps polling until the cache is online. Mirrors + // Phase 1's collectPresentKeys nil-cache contract. + present := make([]*schemav1.SchemaKey, len(keys)) + copy(present, keys) + return stillPresentResult{member: m, stillPresent: present} + } + var present []*schemav1.SchemaKey + for _, key := range keys { + propID := schemaKeyToPropID(key) + if _, ok := c.GetKeyModRevision(propID); ok { + present = append(present, key) + } + } + return stillPresentResult{member: m, stillPresent: present, ready: len(present) == 0} + } + + tier := b.peerLiaisons + if m.role == roleData { + tier = b.dataNodes + } + if tier == nil { + return stillPresentResult{member: m, err: errors.New("no tier client wired")} + } + client := tier() + if client == nil { + return stillPresentResult{member: m, err: errors.New("tier client unavailable")} + } + statusClient, err := client.NewNodeSchemaStatusClient(m.name) + if err != nil { + return stillPresentResult{member: m, err: err} + } + + var present []*schemav1.SchemaKey + for chunkStart := 0; chunkStart < len(keys); chunkStart += barrierKeyChunkSize { + chunkEnd := chunkStart + barrierKeyChunkSize + if chunkEnd > len(keys) { + chunkEnd = len(keys) + } + chunkKeys := keys[chunkStart:chunkEnd] + resp, rpcErr := statusClient.GetAbsentKeys(ctx, &clusterv1.GetAbsentKeysRequest{Keys: chunkKeys}) + if rpcErr != nil { + if status.Code(rpcErr) == codes.Unimplemented { + return stillPresentResult{member: m, ready: true} + } + return stillPresentResult{member: m, err: rpcErr} + } + present = append(present, resp.GetStillPresentKeys()...) + } + return stillPresentResult{member: m, stillPresent: present, ready: len(present) == 0} +} + +// allKeysAbsent reports whether every member's most recent probe found +// every requested key absent (i.e. removed from its cache). +func allKeysAbsent(results []stillPresentResult) bool { + for _, r := range results { + if !r.ready { + return false + } + } + return true +} + +// stillPresentLaggards builds the laggards list for the timeout response, +// each entry carrying the per-node still_present_keys observed in the +// final iteration. +func stillPresentLaggards(results []stillPresentResult) []*schemav1.NodeLaggard { + laggards := make([]*schemav1.NodeLaggard, 0) + for _, r := range results { + if r.ready { + continue + } + laggards = append(laggards, &schemav1.NodeLaggard{ + Node: r.member.laggardName(), + StillPresentKeys: r.stillPresent, + }) + } + return laggards +} + // allKeysApplied reports whether every member's most recent probe found // every key applied at or above its target revision. func allKeysApplied(results []keyProbeResult) bool { diff --git a/banyand/liaison/grpc/barrier_cluster_test.go b/banyand/liaison/grpc/barrier_cluster_test.go index 2921e6156..ce98f7c79 100644 --- a/banyand/liaison/grpc/barrier_cluster_test.go +++ b/banyand/liaison/grpc/barrier_cluster_test.go @@ -55,6 +55,7 @@ type fakeNodeStatusClient struct { err error callsRef *int32 keyState map[string]fakeKeyState + keyStateFn func() map[string]fakeKeyState keyRevsCalls *int32 revs []int64 maxRev int64 @@ -126,8 +127,37 @@ func keyStateKey(kind, group, name string) string { return kind + "|" + group + "|" + name } -func (*fakeNodeStatusClient) GetAbsentKeys(_ context.Context, _ *clusterv1.GetAbsentKeysRequest, _ ...grpc.CallOption) (*clusterv1.GetAbsentKeysResponse, error) { - panic("GetAbsentKeys: unused in Phase 2.2 tests") +func (f *fakeNodeStatusClient) GetAbsentKeys( + ctx context.Context, req *clusterv1.GetAbsentKeysRequest, _ ...grpc.CallOption, +) (*clusterv1.GetAbsentKeysResponse, error) { + if f.keyRevsCalls != nil { + atomic.AddInt32(f.keyRevsCalls, 1) + } + if f.keyRevsDelay > 0 { + select { + case <-time.After(f.keyRevsDelay): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + if f.err != nil { + return nil, f.err + } + state := f.keyState + if f.keyStateFn != nil { + state = f.keyStateFn() + } + keys := req.GetKeys() + var absent, present []*schemav1.SchemaKey + for _, k := range keys { + mapKey := k.GetKind() + "|" + k.GetGroup() + "|" + k.GetName() + if s, ok := state[mapKey]; ok && s.present { + present = append(present, k) + continue + } + absent = append(absent, k) + } + return &clusterv1.GetAbsentKeysResponse{AbsentKeys: absent, StillPresentKeys: present}, nil } // fakeQueueClient embeds queue.Client (a nil interface) so unused methods @@ -687,3 +717,115 @@ func TestAwaitSchemaApplied_FanOut_PeerUnimplemented_TreatedAsReady(t *testing.T "Unimplemented from a Phase-1 peer must not block AwaitSchemaApplied") assert.Empty(t, resp.GetLaggards()) } + +// TestAwaitSchemaDeleted_FanOut_ReportsStillPresent verifies that the +// timeout response carries per-node still_present_keys with role-prefixed +// identifiers — exact mirror of the AwaitSchemaApplied missing-keys layout +// for the deletion barrier. +func TestAwaitSchemaDeleted_FanOut_ReportsStillPresent(t *testing.T) { + keys := []*schemav1.SchemaKey{ + streamKey("g", "k0"), + streamKey("g", "k1"), + streamKey("g", "k2"), + } + cache := &staticBarrierCache{maxModRevision: 100} // self has nothing → all absent + peerA := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{ + keyStateKey("stream", "g", "k0"): presentAt(100), + keyStateKey("stream", "g", "k1"): presentAt(100), + }} // still_present: k0, k1 + peerB := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{ + keyStateKey("stream", "g", "k2"): presentAt(100), + }} // still_present: k2 + tier1 := newFakeTier([]string{"peer-A", "peer-B"}, map[string]clusterv1.NodeSchemaStatusServiceClient{ + "peer-A": peerA, + "peer-B": peerB, + }) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitSchemaDeleted(context.Background(), &schemav1.AwaitSchemaDeletedRequest{ + Keys: keys, + Timeout: durationpb.New(60 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + require.Len(t, resp.GetLaggards(), 2) + + la := laggardByNode(resp.GetLaggards(), "liaison-peer-A") + require.NotNil(t, la, "laggard for peer-A must be present") + assert.Equal(t, []string{"k0", "k1"}, schemaKeyNames(la.GetStillPresentKeys())) + + lb := laggardByNode(resp.GetLaggards(), "liaison-peer-B") + require.NotNil(t, lb, "laggard for peer-B must be present") + assert.Equal(t, []string{"k2"}, schemaKeyNames(lb.GetStillPresentKeys())) +} + +// TestAwaitSchemaDeleted_FanOut_SucceedsAfterAllNodesDrain verifies that +// once every member reports the keys absent, applied=true is returned. The +// peer's keyState is mutated on the second probe call to simulate a drain +// completing mid-call (real production: the watch event finally landed). +func TestAwaitSchemaDeleted_FanOut_SucceedsAfterAllNodesDrain(t *testing.T) { + keys := []*schemav1.SchemaKey{streamKey("g", "k0"), streamKey("g", "k1")} + cache := &staticBarrierCache{maxModRevision: 100} // self drained from start + var calls int32 + peer := &fakeNodeStatusClient{ + keyRevsCalls: &calls, + keyStateFn: func() map[string]fakeKeyState { + // Call 1: both keys still present; call 2+: both drained. + if atomic.LoadInt32(&calls) <= 1 { + return map[string]fakeKeyState{ + keyStateKey("stream", "g", "k0"): presentAt(100), + keyStateKey("stream", "g", "k1"): presentAt(100), + } + } + return map[string]fakeKeyState{} + }, + } + tier1 := newFakeTier([]string{"peer"}, map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer}) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitSchemaDeleted(context.Background(), &schemav1.AwaitSchemaDeletedRequest{ + Keys: keys, + Timeout: durationpb.New(200 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied(), + "barrier must converge once peer reports both keys absent on the second probe") + assert.Empty(t, resp.GetLaggards()) + assert.GreaterOrEqual(t, atomic.LoadInt32(&calls), int32(2), + "the loop must have probed at least twice for the drain to be observed") +} + +// TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey verifies +// that after a delete + recreate at a higher revision, the absence probe +// recognizes the new live row on every member and returns applied=false +// with that key in still_present_keys (the new instance is alive — the +// caller's deletion request is no longer accurate for it). +// +// Setup: key "k0" was deleted then recreated at rev=200. peer's cache +// reports it Present with rev=200. Self's cache also reports it present. +// The barrier must therefore time out and surface the still-present key. +func TestAwaitSchemaDeleted_FanOut_MixedOldAndNewSchemasOnSameKey(t *testing.T) { + keys := []*schemav1.SchemaKey{streamKey("g", "k0")} + cache := &staticBarrierCache{ + maxModRevision: 200, + keys: map[string]int64{"stream_g/k0": 200}, // recreated at higher rev + } + peer := &fakeNodeStatusClient{keyState: map[string]fakeKeyState{ + keyStateKey("stream", "g", "k0"): presentAt(200), + }} + tier1 := newFakeTier([]string{"peer"}, map[string]clusterv1.NodeSchemaStatusServiceClient{"peer": peer}) + svc := (&clusterFixture{cache: cache, tier1: tier1, tier2: newFakeTier(nil, nil), self: "self-liaison"}).build() + + resp, err := svc.AwaitSchemaDeleted(context.Background(), &schemav1.AwaitSchemaDeletedRequest{ + Keys: keys, + Timeout: durationpb.New(40 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied(), + "a recreated-at-higher-rev key is alive on every member; deletion barrier must NOT report applied=true") + require.NotEmpty(t, resp.GetLaggards()) + for _, l := range resp.GetLaggards() { + assert.Equal(t, []string{"k0"}, schemaKeyNames(l.GetStillPresentKeys()), + "every member's laggard entry must list k0 as still present") + } +}
