This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 96d3932bea501f46138f45c299ea582531a88b07 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 6 15:34:28 2026 +0000 feat(liaison): write/query gate reads NodeRepoRegistry, in lockstep with AwaitXXX (Phase 2 Step 2.5 §GC-1) Closes the residual gate↔barrier divergence the prior commit only addressed on the AwaitXXX side. validateWriteRequest (measure / stream / trace) and the per-group query gate (checkQueryGate caller for the same three services) now resolve cacheRev through the per-node NodeRepoRegistry, mirroring the view AwaitRevisionApplied / AwaitSchemaApplied / AwaitSchemaDeleted consult after Step 2.5 §2. The locator (entityRepo.getLocator / getTrace) still answers the existence check (STATUS_NOT_FOUND signal) and downstream navigation; only the revision scalar moves to the registry. When the registry is authoritative for the kind but does not yet hold the key the helper returns 0, so the bounded awaitRevisionReached fires; if it never catches up the gate returns STATUS_SCHEMA_NOT_APPLIED — the same shape clients already expect. Routing pinned in banyand/liaison/grpc/schema_revision_registry.go: resolveSchemaRevision(reg, kind, group, name, fallback) int64 resolveQueryGateRevision(reg, kind, group, name, locRev, locExists) (int64, bool) Net contract: if AwaitRevisionApplied(R) on this liaison returns applied=true, the write gate, query gate, and downstream executor on the same node all see ≥ R for any key included in R. The §4.6.2 / §6.8 / §6.11 / §4.6.4 second-run flake — gate falsely succeeds, executor misses LoadGroup — cannot recur on a node where metadata.Service is wired. Adds banyand/liaison/grpc/schema_revision_registry_test.go covering registry lag, registry advance during await, registry-equals-client even when locator lags, no-registry fall-through, locator-existence preservation in the query gate, and the helper's edge cases. Stream and trace counterparts mirror the measure case. Distributed verification (combined with Step 2.5 §1+§2): Will run 28 of 28 specs Ran 28 of 28 Specs in 90.849 seconds SUCCESS! -- 28 Passed | 0 Failed | 0 Pending | 0 Skipped via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- banyand/liaison/grpc/measure.go | 21 +- banyand/liaison/grpc/schema_revision_registry.go | 92 +++++ .../liaison/grpc/schema_revision_registry_test.go | 383 +++++++++++++++++++++ banyand/liaison/grpc/stream.go | 19 +- banyand/liaison/grpc/trace.go | 20 +- 5 files changed, 517 insertions(+), 18 deletions(-) diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 6017d8698..fd3aaaba9 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -32,6 +32,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/accesslog" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -197,8 +198,15 @@ func (ms *measureService) validateWriteRequest(writeRequest *measurev1.WriteRequ ms.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure) return modelv1.Status_STATUS_NOT_FOUND } + // Read the cache revision from the per-node NodeRepoRegistry — the + // same view AwaitRevisionApplied / AwaitSchemaApplied consult — so the + // gate's verdict is in lockstep with the barrier on this node and the + // data-node executor cannot miss a key the gate just certified. The + // locator's ModRevision is the legacy fallback for fixtures where the + // metadata service does not implement metadata.Service. + reg := schemaRevisionRegistry(ms.metadataRepo) clientRev := metadata.ModRevision - cacheRev := measureCache.ModRevision + cacheRev := resolveSchemaRevision(reg, schema.KindMeasure, metadata.GetGroup(), metadata.GetName(), measureCache.ModRevision) if clientRev < cacheRev { ms.l.Error().Stringer("written", writeRequest).Msg("the measure schema is expired") ms.sendReply(metadata, modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure) @@ -206,11 +214,11 @@ func (ms *measureService) validateWriteRequest(writeRequest *measurev1.WriteRequ } if clientRev > cacheRev { reached := awaitRevisionReached(func() int64 { - loc, ok := ms.entityRepo.getLocator(id) - if !ok { - return 0 + fallback := int64(0) + if loc, ok := ms.entityRepo.getLocator(id); ok { + fallback = loc.ModRevision } - return loc.ModRevision + return resolveSchemaRevision(reg, schema.KindMeasure, metadata.GetGroup(), metadata.GetName(), fallback) }, clientRev, ms.maxWaitDuration) if !reached { ms.sendReply(metadata, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeRequest.GetMessageId(), measure) @@ -436,10 +444,11 @@ func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest } }() } + measureReg := schemaRevisionRegistry(ms.metadataRepo) gatedStatuses, shortCircuit := checkQueryGate(req.Groups, req.Name, req.GroupModRevisions, func(name, group string) (int64, bool) { loc, ok := ms.entityRepo.getLocator(identity{name: name, group: group}) - return loc.ModRevision, ok + return resolveQueryGateRevision(measureReg, schema.KindMeasure, group, name, loc.ModRevision, ok) }, ms.maxWaitDuration) if shortCircuit { return &measurev1.QueryResponse{GroupStatuses: gatedStatuses}, nil diff --git a/banyand/liaison/grpc/schema_revision_registry.go b/banyand/liaison/grpc/schema_revision_registry.go new file mode 100644 index 000000000..0a55205a5 --- /dev/null +++ b/banyand/liaison/grpc/schema_revision_registry.go @@ -0,0 +1,92 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" +) + +// nodeRepoRegistryProvider is the minimum surface schemaRevisionRegistry +// needs from a metadata.Repo. metadata.Service satisfies it in production; +// declaring it locally keeps the helper independent of the wider Service +// interface and lets unit tests inject a registry without stubbing every +// metadata.Service method. +type nodeRepoRegistryProvider interface { + NodeRepoRegistry() *registry.NodeRepoRegistry +} + +// schemaRevisionRegistry returns the per-node NodeRepoRegistry the repo +// exposes. Returns nil when repo does not provide one (legacy unit-test +// fixtures); callers fall back to the locator-based lookup. +func schemaRevisionRegistry(repo metadata.Repo) *registry.NodeRepoRegistry { + p, ok := repo.(nodeRepoRegistryProvider) + if !ok { + return nil + } + return p.NodeRepoRegistry() +} + +// resolveSchemaRevision answers the write gate's "what mod_revision is this +// node at for (kind, group, name)?" question. +// +// The registry is authoritative for kinds tracked by at least one per-service +// schemaRepo (Group/Stream/Measure/Trace/IndexRule/IndexRuleBinding) — the +// same caches the executor consults via LoadGroup / LoadResource. Reading +// from the registry instead of the entityRepo locator closes the gap between +// AwaitXXX (which reads the registry view) and the write gate's verdict on +// the same node, so the executor cannot miss a key the gate just certified. +// +// When the registry is authoritative for the kind but does NOT hold the key, +// the function returns 0 so the three-way write-gate split treats the +// request as ahead-of-cache and the bounded await runs. +// +// When the registry does not track the kind (no per-service schemaRepo +// registered for it; e.g., legacy unit tests with a metadata.Repo mock that +// is not a metadata.Service), the caller-provided fallback (typically the +// locator's ModRevision) is returned so existing fixtures keep working. +func resolveSchemaRevision(reg *registry.NodeRepoRegistry, kind schema.Kind, group, name string, fallback int64) int64 { + if reg == nil || !reg.HasKind(kind) { + return fallback + } + if rev, ok := reg.ResourceRevision(kind, group, name); ok { + return rev + } + return 0 +} + +// resolveQueryGateRevision answers the query gate's +// `getLocatorRevision(name, group)` callback as a (cacheRev, found) pair. +// +// Routing matches resolveSchemaRevision: registry-tracked kinds read through +// the per-service schemaRepo aggregator; un-tracked kinds fall through to the +// locator. The `found` flag mirrors the locator's existence so the gate's +// STATUS_NOT_FOUND vs SCHEMA_NOT_APPLIED contract is preserved — when the +// registry is authoritative for the kind but does not yet hold the key, the +// gate must return SCHEMA_NOT_APPLIED (after the bounded await), not +// NOT_FOUND, so long as the locator already knows the resource exists. +func resolveQueryGateRevision(reg *registry.NodeRepoRegistry, kind schema.Kind, group, name string, locatorRev int64, locatorExists bool) (int64, bool) { + if reg == nil || !reg.HasKind(kind) { + return locatorRev, locatorExists + } + if rev, ok := reg.ResourceRevision(kind, group, name); ok { + return rev, true + } + return 0, locatorExists +} diff --git a/banyand/liaison/grpc/schema_revision_registry_test.go b/banyand/liaison/grpc/schema_revision_registry_test.go new file mode 100644 index 000000000..3a17de4e5 --- /dev/null +++ b/banyand/liaison/grpc/schema_revision_registry_test.go @@ -0,0 +1,383 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/schema/registry" +) + +// fakeRevisionRepo implements registry.RevisionRepository with a controllable +// per-key map and an atomically advancing latestModRevision watermark. Tests +// use it to simulate the eventCh-retry leak the gate must close: a state +// where the entityRepo locator already advanced to R but the schemaRepo (and +// therefore the registry) has not — in production this is the data-node +// executor lag the cluster barrier already accounts for via NodeRepoRegistry. +type fakeRevisionRepo struct { + keys map[string]int64 + latest atomic.Int64 + mu sync.RWMutex +} + +func newFakeRevisionRepo() *fakeRevisionRepo { + return &fakeRevisionRepo{keys: make(map[string]int64)} +} + +func (f *fakeRevisionRepo) keyOf(kind schema.Kind, group, name string) string { + return kind.String() + "|" + group + "|" + name +} + +func (f *fakeRevisionRepo) set(kind schema.Kind, group, name string, rev int64) { + f.mu.Lock() + f.keys[f.keyOf(kind, group, name)] = rev + f.mu.Unlock() + for { + cur := f.latest.Load() + if rev <= cur || f.latest.CompareAndSwap(cur, rev) { + return + } + } +} + +// LatestModRevision implements registry.RevisionRepository. +func (f *fakeRevisionRepo) LatestModRevision() int64 { return f.latest.Load() } + +// ResourceRevision implements registry.RevisionRepository. +func (f *fakeRevisionRepo) ResourceRevision(kind schema.Kind, group, name string) (int64, bool) { + f.mu.RLock() + defer f.mu.RUnlock() + rev, ok := f.keys[f.keyOf(kind, group, name)] + return rev, ok +} + +// IsAbsent implements registry.RevisionRepository. +func (f *fakeRevisionRepo) IsAbsent(kind schema.Kind, group, name string) bool { + _, ok := f.ResourceRevision(kind, group, name) + return !ok +} + +// fakeRepoWithRegistry satisfies metadata.Repo via an embedded nil interface +// and adds NodeRepoRegistry so the gate's schemaRevisionRegistry helper can +// pick up the per-node registry without forcing the test to stub every +// metadata.Service method. The embedded metadata.Repo is never invoked from +// the gate path under test. +type fakeRepoWithRegistry struct { + metadata.Repo + reg *registry.NodeRepoRegistry +} + +func (f *fakeRepoWithRegistry) NodeRepoRegistry() *registry.NodeRepoRegistry { return f.reg } + +// newRepoWithRegisteredFake builds a metadata.Repo whose NodeRepoRegistry has +// `repo` bound under `kinds`. Returns the wired metadata.Repo so it can be +// dropped into discoveryService.metadataRepo for gate tests. +func newRepoWithRegisteredFake(kinds schema.Kind, repo *fakeRevisionRepo) metadata.Repo { + reg := registry.NewNodeRepoRegistry() + reg.Register(kinds, repo) + return &fakeRepoWithRegistry{reg: reg} +} + +// withMetadataRepo wires svc.discoveryService.metadataRepo to repo. The +// helpers in measure_write_test.go / stream_write_test.go / trace_write_test.go +// leave the field nil; these tests overwrite it so the gate consults the +// registry instead of falling through to the locator. +func withMetadataRepoMeasure(svc *measureService, repo metadata.Repo) *measureService { + svc.discoveryService.metadataRepo = repo + return svc +} + +func withMetadataRepoStream(svc *streamService, repo metadata.Repo) *streamService { + svc.discoveryService.metadataRepo = repo + return svc +} + +func withMetadataRepoTrace(svc *traceService, repo metadata.Repo) *traceService { + svc.discoveryService.metadataRepo = repo + return svc +} + +// --- Write gate ---------------------------------------------------------. + +// TestWriteGate_Measure_RegistryLagsLocator_GateRunsBoundedAwait reproduces the +// eventCh-retry leak: the entityRepo locator advanced to R but the +// schemaRepo-backed registry did not, so the bare locator-only gate would +// falsely succeed at R. With the registry-aware gate the request is treated +// as ahead-of-cache; the bounded await fires; with the registry never +// advancing, the gate returns STATUS_SCHEMA_NOT_APPLIED. +func TestWriteGate_Measure_RegistryLagsLocator_GateRunsBoundedAwait(t *testing.T) { + id := identity{group: "g", name: "m"} + er := seededLocatorRepo(id) // locator at seededLocatorRepoRev (=100) + repo := newFakeRevisionRepo() + repo.set(schema.KindMeasure, id.group, id.name, seededLocatorRepoRev-1) // registry one rev behind + + repoWithReg := newRepoWithRegisteredFake(schema.KindMeasure, repo) + svc := withMetadataRepoMeasure(newTestMeasureService(er, 50*time.Millisecond), repoWithReg) + mock := &mockBidiServer[measurev1.WriteRequest, measurev1.WriteResponse]{} + + meta := &commonv1.Metadata{Group: id.group, Name: id.name, ModRevision: seededLocatorRepoRev} + st := svc.validateWriteRequest(validMeasureWriteRequest(), meta, mock) + + assert.Equal(t, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, st) + require.Len(t, mock.replies, 1) + assert.Equal(t, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED.String(), mock.replies[0].Status) +} + +// TestWriteGate_Measure_RegistryAdvancesDuringAwait_GateSucceeds asserts the +// registry-aware gate succeeds when the schemaRepo catches up inside the +// bounded await window — the "watch event finally landed before timeout" +// case the cluster barrier already handles for AwaitRevisionApplied. +func TestWriteGate_Measure_RegistryAdvancesDuringAwait_GateSucceeds(t *testing.T) { + id := identity{group: "g", name: "m"} + er := seededLocatorRepo(id) + repo := newFakeRevisionRepo() + repo.set(schema.KindMeasure, id.group, id.name, seededLocatorRepoRev-1) + + repoWithReg := newRepoWithRegisteredFake(schema.KindMeasure, repo) + svc := withMetadataRepoMeasure(newTestMeasureService(er, 500*time.Millisecond), repoWithReg) + mock := &mockBidiServer[measurev1.WriteRequest, measurev1.WriteResponse]{} + + go func() { + time.Sleep(20 * time.Millisecond) + repo.set(schema.KindMeasure, id.group, id.name, seededLocatorRepoRev) + }() + + meta := &commonv1.Metadata{Group: id.group, Name: id.name, ModRevision: seededLocatorRepoRev} + st := svc.validateWriteRequest(validMeasureWriteRequest(), meta, mock) + + assert.Equal(t, modelv1.Status_STATUS_SUCCEED, st) + assert.Empty(t, mock.replies, "no error reply should be sent when the registry catches up in time") +} + +// TestWriteGate_Measure_RegistryEqualsClient_GateSucceeds_EvenWhenLocatorLags +// asserts that when the registry already holds the client's revision, the +// gate accepts the write even if the locator has not yet been refreshed — +// the inverse of the leak: if the schemaRepo says R, the executor will see R, +// regardless of whether the locator handler fired first. +func TestWriteGate_Measure_RegistryEqualsClient_GateSucceeds_EvenWhenLocatorLags(t *testing.T) { + id := identity{group: "g", name: "m"} + er := seededLocatorRepo(id) // locator at 100 + repo := newFakeRevisionRepo() + repo.set(schema.KindMeasure, id.group, id.name, seededLocatorRepoRev) // registry at 100 + + repoWithReg := newRepoWithRegisteredFake(schema.KindMeasure, repo) + svc := withMetadataRepoMeasure(newTestMeasureService(er, 50*time.Millisecond), repoWithReg) + mock := &mockBidiServer[measurev1.WriteRequest, measurev1.WriteResponse]{} + + meta := &commonv1.Metadata{Group: id.group, Name: id.name, ModRevision: seededLocatorRepoRev} + st := svc.validateWriteRequest(validMeasureWriteRequest(), meta, mock) + + assert.Equal(t, modelv1.Status_STATUS_SUCCEED, st) + assert.Empty(t, mock.replies) +} + +// TestWriteGate_Measure_NoRegistry_FallsBackToLocator asserts the existing +// behavior is preserved on metadata.Repo values that do not expose +// NodeRepoRegistry — the locator's ModRevision is the comparison source and +// the prior test fixtures (see measure_write_test.go) still hold. +func TestWriteGate_Measure_NoRegistry_FallsBackToLocator(t *testing.T) { + id := identity{group: "g", name: "m"} + er := seededLocatorRepo(id) // locator at 100 + svc := newTestMeasureService(er, 50*time.Millisecond) + mock := &mockBidiServer[measurev1.WriteRequest, measurev1.WriteResponse]{} + + meta := &commonv1.Metadata{Group: id.group, Name: id.name, ModRevision: seededLocatorRepoRev} + st := svc.validateWriteRequest(validMeasureWriteRequest(), meta, mock) + + assert.Equal(t, modelv1.Status_STATUS_SUCCEED, st) + assert.Empty(t, mock.replies) +} + +func TestWriteGate_Stream_RegistryLagsLocator_GateRunsBoundedAwait(t *testing.T) { + id := identity{group: "g", name: "s"} + er := seededLocatorRepo(id) + repo := newFakeRevisionRepo() + repo.set(schema.KindStream, id.group, id.name, seededLocatorRepoRev-1) + + repoWithReg := newRepoWithRegisteredFake(schema.KindStream, repo) + svc := withMetadataRepoStream(newTestStreamService(er, 50*time.Millisecond), repoWithReg) + mock := &mockBidiServer[streamv1.WriteRequest, streamv1.WriteResponse]{} + + meta := &commonv1.Metadata{Group: id.group, Name: id.name, ModRevision: seededLocatorRepoRev} + st := svc.validateWriteRequest(validStreamWriteRequest(), meta, mock) + + assert.Equal(t, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, st) +} + +func TestWriteGate_Trace_RegistryLagsTrace_GateRunsBoundedAwait(t *testing.T) { + id := identity{group: "g", name: "tr"} + er := seededTraceRepo(id, seededLocatorRepoRev) + repo := newFakeRevisionRepo() + repo.set(schema.KindTrace, id.group, id.name, seededLocatorRepoRev-1) + + repoWithReg := newRepoWithRegisteredFake(schema.KindTrace, repo) + svc := withMetadataRepoTrace(newTestTraceService(er, 50*time.Millisecond), repoWithReg) + mock := &mockBidiServer[tracev1.WriteRequest, tracev1.WriteResponse]{} + + meta := &commonv1.Metadata{Group: id.group, Name: id.name, ModRevision: seededLocatorRepoRev} + st := svc.validateWriteRequest(validTraceWriteRequest(), meta, nil, mock) + + assert.Equal(t, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, st) +} + +// --- Query gate ---------------------------------------------------------. + +// TestQueryGate_Measure_RegistryLag_ReportsSchemaNotApplied: per-group gate +// receives a client revision that the locator says is satisfied but the +// registry says is one behind. The receiving liaison must report +// STATUS_SCHEMA_NOT_APPLIED so a follow-up routed through this liaison sees +// the same verdict the data-node executor would produce on a forwarded +// sub-query. +func TestQueryGate_Measure_RegistryLag_ReportsSchemaNotApplied(t *testing.T) { + id := identity{group: "g", name: "m"} + er := seededLocatorRepo(id) // locator at 100 + repo := newFakeRevisionRepo() + repo.set(schema.KindMeasure, id.group, id.name, seededLocatorRepoRev-1) + + repoWithReg := newRepoWithRegisteredFake(schema.KindMeasure, repo) + measureReg := schemaRevisionRegistry(repoWithReg) + require.NotNil(t, measureReg) + + statuses, shortCircuit := checkQueryGate( + []string{id.group}, + id.name, + map[string]int64{id.group: seededLocatorRepoRev}, + func(name, group string) (int64, bool) { + loc, ok := er.getLocator(identity{name: name, group: group}) + return resolveQueryGateRevision(measureReg, schema.KindMeasure, group, name, loc.ModRevision, ok) + }, + 50*time.Millisecond, + ) + + assert.Equal(t, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, statuses[id.group]) + assert.True(t, shortCircuit) +} + +// TestQueryGate_Measure_RegistryEqual_ReportsSucceed: registry holds the +// client's revision; gate succeeds without short-circuit even when the +// locator lags (inverse of the leak). +func TestQueryGate_Measure_RegistryEqual_ReportsSucceed(t *testing.T) { + id := identity{group: "g", name: "m"} + er := newEmptyEntityRepo() // locator missing + repo := newFakeRevisionRepo() + repo.set(schema.KindMeasure, id.group, id.name, seededLocatorRepoRev) + + repoWithReg := newRepoWithRegisteredFake(schema.KindMeasure, repo) + measureReg := schemaRevisionRegistry(repoWithReg) + + statuses, shortCircuit := checkQueryGate( + []string{id.group}, + id.name, + map[string]int64{id.group: seededLocatorRepoRev}, + func(name, group string) (int64, bool) { + loc, ok := er.getLocator(identity{name: name, group: group}) + return resolveQueryGateRevision(measureReg, schema.KindMeasure, group, name, loc.ModRevision, ok) + }, + 50*time.Millisecond, + ) + + assert.Equal(t, modelv1.Status_STATUS_SUCCEED, statuses[id.group]) + assert.False(t, shortCircuit) +} + +// TestQueryGate_Measure_RegistryAuthoritative_BothMissing_ReportsNotFound: +// when the registry is authoritative for the kind, the locator is missing, +// and the registry has no entry, the gate returns STATUS_NOT_FOUND — not +// SCHEMA_NOT_APPLIED — so callers get the same shape they got pre-Option-C +// for an unknown resource. +func TestQueryGate_Measure_RegistryAuthoritative_BothMissing_ReportsNotFound(t *testing.T) { + id := identity{group: "g", name: "m"} + er := newEmptyEntityRepo() + repo := newFakeRevisionRepo() + // repo registered but holds no entry for (KindMeasure, g, m); locator + // also empty. Registry is authoritative for KindMeasure and reports + // absent → resolveQueryGateRevision should mirror locator existence. + + repoWithReg := newRepoWithRegisteredFake(schema.KindMeasure, repo) + measureReg := schemaRevisionRegistry(repoWithReg) + + statuses, shortCircuit := checkQueryGate( + []string{id.group}, + id.name, + map[string]int64{id.group: seededLocatorRepoRev}, + func(name, group string) (int64, bool) { + loc, ok := er.getLocator(identity{name: name, group: group}) + return resolveQueryGateRevision(measureReg, schema.KindMeasure, group, name, loc.ModRevision, ok) + }, + 50*time.Millisecond, + ) + + assert.Equal(t, modelv1.Status_STATUS_NOT_FOUND, statuses[id.group]) + assert.True(t, shortCircuit) +} + +// --- Helper-level tests --------------------------------------------------. + +func TestResolveSchemaRevision_RegistryNotTrackingKind_ReturnsFallback(t *testing.T) { + repo := newFakeRevisionRepo() + // Register the fake against KindStream only; KindMeasure is un-tracked. + reg := registry.NewNodeRepoRegistry() + reg.Register(schema.KindStream, repo) + + got := resolveSchemaRevision(reg, schema.KindMeasure, "g", "m", seededLocatorRepoRev) + assert.Equal(t, seededLocatorRepoRev, got, "un-tracked kinds should return the caller-provided fallback") +} + +func TestResolveSchemaRevision_RegistryAuthoritative_KeyAbsent_ReturnsZero(t *testing.T) { + repo := newFakeRevisionRepo() + reg := registry.NewNodeRepoRegistry() + reg.Register(schema.KindMeasure, repo) + // No key set on repo. + + got := resolveSchemaRevision(reg, schema.KindMeasure, "g", "m", seededLocatorRepoRev) + assert.Equal(t, int64(0), got, "registry authoritative for kind but key absent should yield 0 so the bounded await runs") +} + +func TestResolveSchemaRevision_NilRegistry_ReturnsFallback(t *testing.T) { + got := resolveSchemaRevision(nil, schema.KindMeasure, "g", "m", seededLocatorRepoRev) + assert.Equal(t, seededLocatorRepoRev, got) +} + +func TestResolveQueryGateRevision_RegistryAuthoritative_LocatorPresent_KeyAbsent_PreservesExistence(t *testing.T) { + repo := newFakeRevisionRepo() + reg := registry.NewNodeRepoRegistry() + reg.Register(schema.KindMeasure, repo) + + rev, found := resolveQueryGateRevision(reg, schema.KindMeasure, "g", "m", seededLocatorRepoRev, true) + assert.Equal(t, int64(0), rev) + assert.True(t, found, "found must mirror locator existence so the gate emits SCHEMA_NOT_APPLIED, not NOT_FOUND") +} + +func TestSchemaRevisionRegistry_NotProvider_ReturnsNil(t *testing.T) { + // metadata.Repo nil interface — the type assertion should fail and yield nil. + var repo metadata.Repo + assert.Nil(t, schemaRevisionRegistry(repo)) +} diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 4c4316915..34beea76b 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -32,6 +32,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/accesslog" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -104,8 +105,13 @@ func (s *streamService) validateWriteRequest(writeEntity *streamv1.WriteRequest, s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream) return modelv1.Status_STATUS_NOT_FOUND } + // See banyand/liaison/grpc/measure.go for the rationale: the gate + // reads through the per-node NodeRepoRegistry so its verdict tracks + // the same cache the AwaitXXX barrier and the data-node executor + // consult; the locator's ModRevision serves as the legacy fallback. + reg := schemaRevisionRegistry(s.metadataRepo) clientRev := metadata.ModRevision - cacheRev := streamCache.ModRevision + cacheRev := resolveSchemaRevision(reg, schema.KindStream, metadata.GetGroup(), metadata.GetName(), streamCache.ModRevision) if clientRev < cacheRev { s.l.Error().Stringer("written", writeEntity).Msg("the stream schema is expired") s.sendReply(metadata, modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream) @@ -113,11 +119,11 @@ func (s *streamService) validateWriteRequest(writeEntity *streamv1.WriteRequest, } if clientRev > cacheRev { reached := awaitRevisionReached(func() int64 { - loc, ok := s.entityRepo.getLocator(id) - if !ok { - return 0 + fallback := int64(0) + if loc, ok := s.entityRepo.getLocator(id); ok { + fallback = loc.ModRevision } - return loc.ModRevision + return resolveSchemaRevision(reg, schema.KindStream, metadata.GetGroup(), metadata.GetName(), fallback) }, clientRev, s.maxWaitDuration) if !reached { s.sendReply(metadata, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeEntity.GetMessageId(), stream) @@ -403,10 +409,11 @@ func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) ( } }() } + streamReg := schemaRevisionRegistry(s.metadataRepo) gatedStatuses, shortCircuit := checkQueryGate(req.Groups, req.Name, req.GroupModRevisions, func(name, group string) (int64, bool) { loc, ok := s.entityRepo.getLocator(identity{name: name, group: group}) - return loc.ModRevision, ok + return resolveQueryGateRevision(streamReg, schema.KindStream, group, name, loc.ModRevision, ok) }, s.maxWaitDuration) if shortCircuit { return &streamv1.QueryResponse{GroupStatuses: gatedStatuses}, nil diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go index 5d3164590..75b0b6b9a 100644 --- a/banyand/liaison/grpc/trace.go +++ b/banyand/liaison/grpc/trace.go @@ -32,6 +32,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/accesslog" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -145,8 +146,14 @@ func (s *traceService) validateWriteRequest(writeEntity *tracev1.WriteRequest, } if metadata.ModRevision > 0 { + // See banyand/liaison/grpc/measure.go for the rationale: the gate + // reads through the per-node NodeRepoRegistry so its verdict tracks + // the same cache the AwaitXXX barrier and the data-node executor + // consult; the entityRepo trace's ModRevision serves as the legacy + // fallback when the metadata service is not a metadata.Service. + reg := schemaRevisionRegistry(s.metadataRepo) clientRev := metadata.ModRevision - cacheRev := traceEntity.GetMetadata().GetModRevision() + cacheRev := resolveSchemaRevision(reg, schema.KindTrace, metadata.GetGroup(), metadata.GetName(), traceEntity.GetMetadata().GetModRevision()) if clientRev < cacheRev { s.l.Error().Stringer("written", writeEntity).Msg("the trace schema is expired") s.sendReply(metadata, modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetVersion(), stream) @@ -154,11 +161,11 @@ func (s *traceService) validateWriteRequest(writeEntity *tracev1.WriteRequest, } if clientRev > cacheRev { reached := awaitRevisionReached(func() int64 { - e, ok := s.entityRepo.getTrace(id) - if !ok { - return 0 + fallback := int64(0) + if e, ok := s.entityRepo.getTrace(id); ok { + fallback = e.GetMetadata().GetModRevision() } - return e.GetMetadata().GetModRevision() + return resolveSchemaRevision(reg, schema.KindTrace, metadata.GetGroup(), metadata.GetName(), fallback) }, clientRev, s.maxWaitDuration) if !reached { s.sendReply(metadata, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeEntity.GetVersion(), stream) @@ -521,10 +528,11 @@ func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) (re } }() } + traceReg := schemaRevisionRegistry(s.metadataRepo) gatedStatuses, shortCircuit := checkQueryGate(req.Groups, req.Name, req.GroupModRevisions, func(name, group string) (int64, bool) { loc, ok := s.entityRepo.getLocator(identity{name: name, group: group}) - return loc.ModRevision, ok + return resolveQueryGateRevision(traceReg, schema.KindTrace, group, name, loc.ModRevision, ok) }, s.maxWaitDuration) if shortCircuit { return &tracev1.QueryResponse{GroupStatuses: gatedStatuses}, nil
